diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 417da27f5c5..20c49d44f11 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -31,15 +31,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var topic:String = null var serializerClass:String = null var zkConnect:String = null + var zkConnectTimeout:String = null + var zkSessionTimeout:String = null var brokerList:String = null private var producer: Producer[String, String] = null + private var config : ProducerConfig = null + @volatile private var initialized:Boolean = false def getTopic:String = topic def setTopic(topic: String) { this.topic = topic } def getZkConnect:String = zkConnect def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect } + + def getZkConnectTimeout:String = zkConnectTimeout + def setZkConnectTimeout(zkConnectTimeout: String) { this.zkConnectTimeout = zkConnectTimeout } + + def getZkSessionTimeout:String = zkSessionTimeout + def setZkSessionTimeout(zkSessionTimeout: String) { this.zkSessionTimeout = zkSessionTimeout } def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } @@ -51,8 +61,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity val props = new Properties() - if( zkConnect == null) connectDiagnostic += "zkConnect" - else props.put("zk.connect", zkConnect); + + if(zkConnect == null) + connectDiagnostic += "zkConnect" + else { + props.put("zk.connect", zkConnect) + if(zkConnectTimeout != null) + props.put("zk.connectiontimeout.ms", zkConnectTimeout) + if(zkSessionTimeout != null) + props.put("zk.sessiontimeout.ms", zkSessionTimeout) + } + if( brokerList == null) connectDiagnostic += "brokerList" else if( props.isEmpty) props.put("broker.list", brokerList) if(props.isEmpty ) @@ -66,13 +85,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { LogLog.warn("Using default encoder - kafka.serializer.StringEncoder") } props.put("serializer.class", serializerClass) - val config : ProducerConfig = new ProducerConfig(props) - producer = new Producer[String, String](config) - LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect)) - LogLog.debug("Logging for topic: " + topic) + config = new ProducerConfig(props) } override def append(event: LoggingEvent) { + // AppenderSkeleton#append serialized via AppenderSkeleton#doAppend + // so it is safe to do this + if(!initialized) { + producer = new Producer[String, String](config) + LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect)) + LogLog.debug("Logging for topic: " + topic) + initialized = true + } val message : String = if( this.layout == null) { event.getRenderedMessage } @@ -86,7 +110,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def close() { if(!this.closed) { this.closed = true - producer.close() + if(initialized) + producer.close() } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 7f67eb3d808..1e93a9fa103 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -220,6 +220,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite with Logging { props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect) + props.put("log4j.appender.KAFKA.ZkConnectTimeout", "10000") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") props