kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede

This commit is contained in:
Jun Rao 2013-05-22 16:23:21 -07:00
parent e93937c886
commit 32cd8994bf
1 changed files with 4 additions and 2 deletions

View File

@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int,
try{
lock synchronized {
channel.connect() // establish a socket connection if needed
channel.send(request)
receive = channel.receive()
var response: RequestOrResponse = null
@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e =>
// log it and let it go. Let controller shut it down.
debug("Exception occurs", e)
warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
}
}
}