KAFKA-749 Bug in socket server shutdown logic makes the broker hang on shutdown until it has to be killed; reviewed by Sriram and Jay Kreps

This commit is contained in:
Neha Narkhede 2013-02-05 20:11:03 -08:00
parent aed6c3c031
commit 826f02a74e
4 changed files with 8 additions and 3 deletions

View File

@ -126,6 +126,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def addResponseListener(onResponse: Int => Unit) {
responseListeners ::= onResponse
}
def shutdown() {
requestQueue.clear
}
}
object RequestMetrics {

View File

@ -71,6 +71,7 @@ class SocketServer(val brokerId: Int,
acceptor.shutdown()
for(processor <- processors)
processor.shutdown()
requestChannel.shutdown
info("shut down completely")
}
}

View File

@ -32,7 +32,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
while(true) {
try {
val req = requestChannel.receiveRequest()
if(req eq RequestChannel.AllDone){
if(req eq RequestChannel.AllDone) {
trace("receives shut down command, shut down".format(brokerId, id))
return
}

View File

@ -110,6 +110,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
info("shutting down")
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
Utils.swallow(requestHandlerPool.shutdown())
Utils.swallow(kafkaScheduler.shutdown())
@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.swallow(kafkaZookeeper.shutdown())
if(replicaManager != null)
Utils.swallow(replicaManager.shutdown())
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(logManager != null)
Utils.swallow(logManager.shutdown())