MINOR: Remove acceptor creation in network thread update code (#4742)

Fix dynamic addition of network threads to only create new Processor threads and not the Acceptor.
This commit is contained in:
Rajini Sivaram 2018-03-21 05:40:16 +00:00 committed by Jason Gustafson
parent f5287ccad2
commit 2f90cb86c1
2 changed files with 29 additions and 18 deletions

View File

@ -81,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createProcessors(config.numNetworkThreads, config.listeners)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
}
newGauge("NetworkProcessorAvgIdlePercent",
@ -112,8 +112,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private def createProcessors(newProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
private def createAcceptorAndProcessors(processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
@ -122,24 +122,30 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (_ <- 0 until newProcessorsPerListener) {
val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
requestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => processors.put(p.id, p))
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
acceptor.addProcessors(listenerProcessors)
addProcessors(acceptor, endpoint, processorsPerListener)
}
}
private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (_ <- 0 until newProcessorsPerListener) {
val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
requestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => processors.put(p.id, p))
acceptor.addProcessors(listenerProcessors)
}
/**
* Stop processing requests and new connections.
*/
@ -156,9 +162,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
if (newNumNetworkThreads > oldNumNetworkThreads)
createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
else if (newNumNetworkThreads < oldNumNetworkThreads)
if (newNumNetworkThreads > oldNumNetworkThreads) {
acceptors.asScala.foreach { case (endpoint, acceptor) =>
addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
}
} else if (newNumNetworkThreads < oldNumNetworkThreads)
acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
}
@ -187,7 +195,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
info(s"Adding listeners for endpoints $listenersAdded")
createProcessors(config.numNetworkThreads, listenersAdded)
createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
}
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {

View File

@ -420,7 +420,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int = {
val newSize = currentSize * 2 - 1
val newSize = if (currentSize == 1) currentSize * 2 else currentSize * 2 - 1
resizeThreadPool(propName, newSize, threadPrefix)
newSize
}
@ -444,6 +444,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
Thread.sleep(100)
}
stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates)
// Verify that all threads are alive
maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix)
}
val config = servers.head.config
@ -457,6 +459,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
"", mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
networkThreadPrefix, mayReceiveDuplicates = true)
verifyThreads("kafka-socket-acceptor-", config.listeners.size)
verifyProcessorMetrics()
verifyMarkPartitionsForTruncation()