MINOR: Small refactor in KafkaApis.handleHeartbeatRequest (#12978)

This is a small follow-up to https://github.com/apache/kafka/pull/12848. 

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2022-12-13 09:18:05 +01:00 committed by GitHub
parent 3541d5ab18
commit 2935a52073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 11 deletions

View File

@ -1739,21 +1739,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val heartbeatRequest = request.body[HeartbeatRequest]
def sendResponse(response: AbstractResponse): Unit = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.maybeSetThrottleTimeMs(requestThrottleMs)
response
})
}
if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponse(heartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
sendResponse(heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.heartbeat(
@ -1761,9 +1754,9 @@ class KafkaApis(val requestChannel: RequestChannel,
heartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
sendResponse(heartbeatRequest.getErrorResponse(exception))
requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(exception))
} else {
sendResponse(new HeartbeatResponse(response))
requestHelper.sendMaybeThrottle(request, new HeartbeatResponse(response))
}
}
}