KAFKA-17022 Fix error-prone in KafkaApis#handleFetchRequest (#16455)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TingIāu "Ting" Kì 2024-06-28 14:53:07 +08:00 committed by GitHub
parent 9be27e715a
commit e57cbe0346
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 9 deletions

View File

@ -936,9 +936,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
erroneous.foreach { case (tp, data) => partitions.put(tp, data) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
var unconvertedFetchResponse: FetchResponse = null def createResponse(throttleTimeMs: Int, unconvertedFetchResponse: FetchResponse): FetchResponse = {
def createResponse(throttleTimeMs: Int): FetchResponse = {
// Down-convert messages for each partition if required // Down-convert messages for each partition if required
val convertedData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val convertedData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]
unconvertedFetchResponse.data().responses().forEach { topicResponse => unconvertedFetchResponse.data().responses().forEach { topicResponse =>
@ -982,13 +980,13 @@ class KafkaApis(val requestChannel: RequestChannel,
if (fetchRequest.isFromFollower) { if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now. // We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader) val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
quotas.leader.record(responseSize) quotas.leader.record(responseSize)
val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum() val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " + trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${unconvertedFetchResponse.sessionId}") s"metadata=${unconvertedFetchResponse.sessionId}")
requestHelper.sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats)) requestHelper.sendResponseExemptThrottle(request, createResponse(0, unconvertedFetchResponse), Some(updateConversionStats))
} else { } else {
// Fetch size used to determine throttle time is calculated before any down conversions. // Fetch size used to determine throttle time is calculated before any down conversions.
// This may be slightly different from the actual response size. But since down conversions // This may be slightly different from the actual response size. But since down conversions
@ -1003,7 +1001,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) { val unconvertedFetchResponse = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response. // from the fetch quota because we are going to return an empty response.
@ -1014,17 +1012,18 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs) requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
} }
// If throttling is required, return an empty response. // If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else { } else {
// Get the actual response. This will update the fetch context. // Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum() val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " + trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${unconvertedFetchResponse.sessionId}") s"metadata=${unconvertedFetchResponse.sessionId}")
unconvertedFetchResponse
} }
// Send the response immediately. // Send the response immediately.
requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs), Some(updateConversionStats)) requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs, unconvertedFetchResponse), Some(updateConversionStats))
} }
} }