diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index ae3a5d8c3c7..2368ebc21cc 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.queue.EventQueue.DeadlineFunction -import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time} +import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} @@ -89,18 +89,6 @@ class BrokerLifecycleManager( private val initialTimeoutNs = MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue()) - /** - * The exponential backoff to use for resending communication. - */ - private val resendExponentialBackoff = - new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong / 2, 0.02) - - /** - * The number of times we've tried and failed to communicate. This variable can only be - * read or written from the BrokerToControllerRequestThread. - */ - private var failedAttempts = 0L - /** * The broker incarnation ID. This ID uniquely identifies each time we start the broker */ @@ -440,7 +428,6 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - failedAttempts = 0 _brokerEpoch = message.data().brokerEpoch() registered = true initialRegistrationSucceeded = true @@ -514,7 +501,6 @@ class BrokerLifecycleManager( val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { val responseData = message.data() - failedAttempts = 0 currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true)) _state match { case BrokerState.STARTING => @@ -577,10 +563,9 @@ class BrokerLifecycleManager( } private def scheduleNextCommunicationAfterFailure(): Unit = { - val delayMs = resendExponentialBackoff.backoff(failedAttempts) - failedAttempts = failedAttempts + 1 nextSchedulingShouldBeImmediate = false // never immediately reschedule after a failure - scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS)) + scheduleNextCommunication(NANOSECONDS.convert( + config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS)) } private def scheduleNextCommunicationAfterSuccess(): Unit = { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 22c6847ded0..c56e178ca2e 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -412,7 +412,7 @@ class BrokerServer( config, "heartbeat", s"broker-${config.nodeId}-", - config.brokerSessionTimeoutMs / 2 // KAFKA-14392 + config.brokerHeartbeatIntervalMs ) lifecycleManager.start( () => sharedServer.loader.lastAppliedOffset(),