From 32cd8994bf35b65a8053e0caea9a7710cc889df7 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 22 May 2013 16:23:21 -0700 Subject: [PATCH] kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede --- .../scala/kafka/controller/ControllerChannelManager.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 0c41d1da4bc..38b867467bb 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int, try{ lock synchronized { + channel.connect() // establish a socket connection if needed channel.send(request) receive = channel.receive() var response: RequestOrResponse = null @@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int, } } catch { case e => - // log it and let it go. Let controller shut it down. - debug("Exception occurs", e) + warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) + // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. + channel.disconnect() } } }