KAFKA-18637: Fix max connections per ip and override reconfigurations (#19099)

Reviewers: Christo Lolov <lolovc@amazon.com>, TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
Azhar Ahmed 2025-03-10 00:27:48 -07:00 committed by GitHub
parent 75260cf92c
commit 832dfa36da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 2 deletions

View File

@ -309,8 +309,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val socketReceiveBufferBytes = getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG) val socketReceiveBufferBytes = getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
val socketRequestMaxBytes = getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG) val socketRequestMaxBytes = getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG)
val socketListenBacklogSize = getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG) val socketListenBacklogSize = getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
val maxConnectionsPerIp = getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG) def maxConnectionsPerIp = getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
val maxConnectionsPerIpOverrides: Map[String, Int] = def maxConnectionsPerIpOverrides: Map[String, Int] =
getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map { case (k, v) => (k, v.toInt)} getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map { case (k, v) => (k, v.toInt)}
def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG) def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
def maxConnectionCreationRate = getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG) def maxConnectionCreationRate = getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)

View File

@ -482,6 +482,30 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSocketServerConfigTest(quorum: String, groupProtocol: String): Unit = {
val updatedMaxConnections = "20"
val connectionsIpsOverride = "1.2.3.4:1234,1.2.4.5:2345"
val properties = new Properties()
properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, updatedMaxConnections)
properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, connectionsIpsOverride)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, properties, true)
servers.foreach(_.shutdown())
servers.foreach(_.awaitShutdown())
servers.foreach(_.startup())
servers.foreach { broker =>
assertEquals(updatedMaxConnections, broker.config.originals()
.get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG).toString)
assertEquals(connectionsIpsOverride, broker.config.originals()
.get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG).toString)
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testLogCleanerConfig(quorum: String, groupProtocol: String): Unit = { def testLogCleanerConfig(quorum: String, groupProtocol: String): Unit = {