KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (#19745)
CI / build (push) Waiting to run Details

Previously, we could wait for up to half of the broker session timeout
for an RPC to complete, and then delay by up to half of the broker
session timeout. When taken together, these two delays could lead to
brokers erroneously missing heartbeats.

This change removes exponential backoff for heartbeats sent from the
broker to the controller. The load caused by heartbeats is not heavy,
and controllers can easily time out heartbeats when the queue length is
too long. Additionally, we now set the maximum RPC time to the length of
the broker period. This minimizes the impact of heavy load.

Reviewers: José Armando García Sancio <jsancio@apache.org>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2025-06-24 16:23:25 -07:00 committed by GitHub
parent dcc9320bf6
commit 6b2013a001
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 19 deletions

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.queue.EventQueue.DeadlineFunction import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time} import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
@ -89,18 +89,6 @@ class BrokerLifecycleManager(
private val initialTimeoutNs = private val initialTimeoutNs =
MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue()) MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
/**
* The exponential backoff to use for resending communication.
*/
private val resendExponentialBackoff =
new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong / 2, 0.02)
/**
* The number of times we've tried and failed to communicate. This variable can only be
* read or written from the BrokerToControllerRequestThread.
*/
private var failedAttempts = 0L
/** /**
* The broker incarnation ID. This ID uniquely identifies each time we start the broker * The broker incarnation ID. This ID uniquely identifies each time we start the broker
*/ */
@ -440,7 +428,6 @@ class BrokerLifecycleManager(
val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse] val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse]
val errorCode = Errors.forCode(message.data().errorCode()) val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) { if (errorCode == Errors.NONE) {
failedAttempts = 0
_brokerEpoch = message.data().brokerEpoch() _brokerEpoch = message.data().brokerEpoch()
registered = true registered = true
initialRegistrationSucceeded = true initialRegistrationSucceeded = true
@ -514,7 +501,6 @@ class BrokerLifecycleManager(
val errorCode = Errors.forCode(message.data().errorCode()) val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) { if (errorCode == Errors.NONE) {
val responseData = message.data() val responseData = message.data()
failedAttempts = 0
currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true)) currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
_state match { _state match {
case BrokerState.STARTING => case BrokerState.STARTING =>
@ -577,10 +563,9 @@ class BrokerLifecycleManager(
} }
private def scheduleNextCommunicationAfterFailure(): Unit = { private def scheduleNextCommunicationAfterFailure(): Unit = {
val delayMs = resendExponentialBackoff.backoff(failedAttempts)
failedAttempts = failedAttempts + 1
nextSchedulingShouldBeImmediate = false // never immediately reschedule after a failure nextSchedulingShouldBeImmediate = false // never immediately reschedule after a failure
scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS)) scheduleNextCommunication(NANOSECONDS.convert(
config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
} }
private def scheduleNextCommunicationAfterSuccess(): Unit = { private def scheduleNextCommunicationAfterSuccess(): Unit = {

View File

@ -412,7 +412,7 @@ class BrokerServer(
config, config,
"heartbeat", "heartbeat",
s"broker-${config.nodeId}-", s"broker-${config.nodeId}-",
config.brokerSessionTimeoutMs / 2 // KAFKA-14392 config.brokerHeartbeatIntervalMs
) )
lifecycleManager.start( lifecycleManager.start(
() => sharedServer.loader.lastAppliedOffset(), () => sharedServer.loader.lastAppliedOffset(),