KAFKA-18399 Remove ZooKeeper from KafkaApis (4/N): OFFSET_COMMIT and OFFSET_FETCH (#18461)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-01-12 20:54:10 +08:00 committed by GitHub
parent 81b1508acb
commit 3cf2e45dc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 0 additions and 109 deletions

View File

@ -326,14 +326,6 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopicsRequest.isEmpty) {
requestHelper.sendMaybeThrottle(request, responseBuilder.build())
CompletableFuture.completedFuture(())
} else if (request.header.apiVersion == 0) {
// For version 0, always store offsets in ZK.
commitOffsetsToZookeeper(
request,
offsetCommitRequest,
authorizedTopicsRequest,
responseBuilder
)
} else {
// For version > 0, store offsets in Coordinator.
commitOffsetsToCoordinator(
@ -347,41 +339,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
private def commitOffsetsToZookeeper(
request: RequestChannel.Request,
offsetCommitRequest: OffsetCommitRequest,
authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
responseBuilder: OffsetCommitResponse.Builder
): CompletableFuture[Unit] = {
val zkSupport = metadataSupport.requireZkOrThrow(
KafkaApis.unsupported("Version 0 offset commit requests"))
authorizedTopicsRequest.foreach { topic =>
topic.partitions.forEach { partition =>
val error = try {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
Errors.OFFSET_METADATA_TOO_LARGE
} else {
zkSupport.zkClient.setOrCreateConsumerOffset(
offsetCommitRequest.data.groupId,
new TopicPartition(topic.name, partition.partitionIndex),
partition.committedOffset
)
Errors.NONE
}
} catch {
case e: Throwable =>
Errors.forException(e)
}
responseBuilder.addPartition(topic.name, partition.partitionIndex, error)
}
}
requestHelper.sendMaybeThrottle(request, responseBuilder.build())
CompletableFuture.completedFuture[Unit](())
}
private def commitOffsetsToCoordinator(
request: RequestChannel.Request,
offsetCommitRequest: OffsetCommitRequest,
@ -1048,61 +1005,6 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val version = request.header.apiVersion
if (version == 0) {
handleOffsetFetchRequestFromZookeeper(request)
} else {
handleOffsetFetchRequestFromCoordinator(request)
}
}
private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = {
val header = request.header
val offsetFetchRequest = request.body[OffsetFetchRequest]
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId))
offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests"))
val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized(
offsetFetchRequest.partitions.asScala, request.context)
// version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
try {
if (!metadataCache.contains(topicPartition))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(payload,
Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE))
case None =>
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
}
}
} catch {
case e: Throwable =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
}.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
offsetFetchResponse
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
CompletableFuture.completedFuture[Unit](())
}
private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = {
val offsetFetchRequest = request.body[OffsetFetchRequest]
val groups = offsetFetchRequest.groups()
val requireStable = offsetFetchRequest.requireStable()
@ -1213,13 +1115,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
private def partitionByAuthorized(
seq: Seq[TopicPartition],
context: RequestContext
): (Seq[TopicPartition], Seq[TopicPartition]) = {
authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
}
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
if (version < 4) {
@ -4357,8 +4252,4 @@ object KafkaApis {
private[server] def shouldAlwaysForward(request: RequestChannel.Request): Exception = {
new UnsupportedVersionException(s"Should always be forwarded to the Active Controller when using a Raft-based metadata quorum: ${request.header.apiKey}")
}
private def unsupported(text: String): Exception = {
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
}
}