Kafka Appender causes Log4j Deadlock; patched by David Arthur; reviewed by Jun Rao; kafka-524

This commit is contained in:
Jun Rao 2013-02-14 09:59:55 -08:00
parent a475d7c434
commit 4d7629dd7f
2 changed files with 33 additions and 7 deletions

View File

@ -31,15 +31,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var topic:String = null
var serializerClass:String = null
var zkConnect:String = null
var zkConnectTimeout:String = null
var zkSessionTimeout:String = null
var brokerList:String = null
private var producer: Producer[String, String] = null
private var config : ProducerConfig = null
@volatile private var initialized:Boolean = false
def getTopic:String = topic
def setTopic(topic: String) { this.topic = topic }
def getZkConnect:String = zkConnect
def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
def getZkConnectTimeout:String = zkConnectTimeout
def setZkConnectTimeout(zkConnectTimeout: String) { this.zkConnectTimeout = zkConnectTimeout }
def getZkSessionTimeout:String = zkSessionTimeout
def setZkSessionTimeout(zkSessionTimeout: String) { this.zkSessionTimeout = zkSessionTimeout }
def getBrokerList:String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
@ -51,8 +61,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
// check for config parameter validity
val props = new Properties()
if( zkConnect == null) connectDiagnostic += "zkConnect"
else props.put("zk.connect", zkConnect);
if(zkConnect == null)
connectDiagnostic += "zkConnect"
else {
props.put("zk.connect", zkConnect)
if(zkConnectTimeout != null)
props.put("zk.connectiontimeout.ms", zkConnectTimeout)
if(zkSessionTimeout != null)
props.put("zk.sessiontimeout.ms", zkSessionTimeout)
}
if( brokerList == null) connectDiagnostic += "brokerList"
else if( props.isEmpty) props.put("broker.list", brokerList)
if(props.isEmpty )
@ -66,13 +85,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
LogLog.debug("Logging for topic: " + topic)
config = new ProducerConfig(props)
}
override def append(event: LoggingEvent) {
// AppenderSkeleton#append serialized via AppenderSkeleton#doAppend
// so it is safe to do this
if(!initialized) {
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
LogLog.debug("Logging for topic: " + topic)
initialized = true
}
val message : String = if( this.layout == null) {
event.getRenderedMessage
}
@ -86,7 +110,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
override def close() {
if(!this.closed) {
this.closed = true
producer.close()
if(initialized)
producer.close()
}
}

View File

@ -220,6 +220,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
props.put("log4j.appender.KAFKA.ZkConnectTimeout", "10000")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props