diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c8be15a58a5..dd37213b1cb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1739,21 +1739,14 @@ class KafkaApis(val requestChannel: RequestChannel, def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val heartbeatRequest = request.body[HeartbeatRequest] - def sendResponse(response: AbstractResponse): Unit = { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - response.maybeSetThrottleTimeMs(requestThrottleMs) - response - }) - } - if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponse(heartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) { - sendResponse(heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { newGroupCoordinator.heartbeat( @@ -1761,9 +1754,9 @@ class KafkaApis(val requestChannel: RequestChannel, heartbeatRequest.data ).handle[Unit] { (response, exception) => if (exception != null) { - sendResponse(heartbeatRequest.getErrorResponse(exception)) + requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(exception)) } else { - sendResponse(new HeartbeatResponse(response)) + requestHelper.sendMaybeThrottle(request, new HeartbeatResponse(response)) } } }