KAFKA-19432 Add an ERROR log message if broker.heartbeat.interval.ms is too large (#20046)

* Log error message if `broker.heartbeat.interval.ms * 2` is large than
`broker.session.timeout.ms`.
* Add test case

`testLogBrokerHeartbeatIntervalMsShouldBeLowerThanHalfOfBrokerSessionTimeoutMs`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-09-04 03:40:21 +08:00 committed by GitHub
parent a9bce0647f
commit ea5b5fec32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 0 deletions

View File

@ -513,6 +513,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
if (brokerHeartbeatIntervalMs * 2 > brokerSessionTimeoutMs) {
error(s"${KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG} ($brokerHeartbeatIntervalMs ms) must be less than or equal to half of the ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG} ($brokerSessionTimeoutMs ms). " +
s"The ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG} is configured on controller. The ${KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG} is configured on broker. " +
s"If a broker doesn't send heartbeat request within ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG}, it loses broker lease. " +
s"Please increase ${KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG} or decrease ${KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG}.")
}
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listener)).toSet
// validate KRaft-related configs

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, Records}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.utils.LogCaptureAppender
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@ -40,11 +41,13 @@ import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.logging.log4j.Level
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import scala.jdk.CollectionConverters._
import scala.util.Using
class KafkaConfigTest {
@ -1899,4 +1902,19 @@ class KafkaConfigTest {
val message = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage
assertEquals("requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role", message)
}
@Test
def testLogBrokerHeartbeatIntervalMsShouldBeLowerThanHalfOfBrokerSessionTimeoutMs(): Unit = {
val props = createDefaultConfig()
Using.resource(LogCaptureAppender.createAndRegister) { appender =>
appender.setClassLogger(KafkaConfig.getClass, Level.ERROR)
props.setProperty(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, "4500")
props.setProperty(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, "8999")
KafkaConfig.fromProps(props)
assertTrue(appender.getMessages.contains("broker.heartbeat.interval.ms (4500 ms) must be less than or equal to half of the broker.session.timeout.ms (8999 ms). " +
"The broker.session.timeout.ms is configured on controller. The broker.heartbeat.interval.ms is configured on broker. " +
"If a broker doesn't send heartbeat request within broker.session.timeout.ms, it loses broker lease. " +
"Please increase broker.session.timeout.ms or decrease broker.heartbeat.interval.ms."))
}
}
}