diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 9ba7ee734d9..addc9061608 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -182,7 +182,10 @@ public class ConfigDef { if (value instanceof List) return (List) value; else if (value instanceof String) - return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); + if (trimmed.isEmpty()) + return Collections.emptyList(); + else + return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else throw new ConfigException(name, value, "Expected a comma separated list."); case CLASS: diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 0067a5375ed..6105726cb41 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -26,36 +26,40 @@ import java.util.{Properties, Date} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} class KafkaLog4jAppender extends AppenderSkeleton with Logging { - var topic:String = null - var brokerList:String = null - var compressionCodec:String = null + var topic: String = null + var brokerList: String = null + var compressionType: String = null var requiredNumAcks: Int = Int.MaxValue + var syncSend: Boolean = false private var producer: KafkaProducer = null - def getTopic:String = topic + def getTopic: String = topic def setTopic(topic: String) { this.topic = topic } - def getBrokerList:String = brokerList + def getBrokerList: String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } - def getCompressionCodec:String = compressionCodec - def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } + def getCompressionType: String = compressionType + def setCompressionType(compressionType: String) { this.compressionType = compressionType } - def getRequiredNumAcks:Int = requiredNumAcks - def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks } + def getRequiredNumAcks: Int = requiredNumAcks + def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks } + + def getSyncSend: Boolean = syncSend + def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend } override def activateOptions() { // check for config parameter validity val props = new Properties() if(brokerList != null) - props.put("metadata.broker.list", brokerList) + props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) if(props.isEmpty) - throw new MissingConfigException("The metadata.broker.list property should be specified") + throw new MissingConfigException("The bootstrap servers property should be specified") if(topic == null) throw new MissingConfigException("topic must be specified by the Kafka log4j appender") - if(compressionCodec != null) props.put("compression.codec", compressionCodec) - if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) + if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) + if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) producer = new KafkaProducer(props) LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) @@ -64,7 +68,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def append(event: LoggingEvent) { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - producer.send(new ProducerRecord(topic, message.getBytes())); + val response = producer.send(new ProducerRecord(topic, message.getBytes())) + if (syncSend) response.get } def subAppend(event: LoggingEvent): String = { diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd042d..bbfb01ed486 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -55,7 +55,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) - server = TestUtils.createServer(config); + server = TestUtils.createServer(config) simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @@ -69,16 +69,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @Test def testKafkaLog4jConfigs() { + // host missing var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - // port missing try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") @@ -86,54 +85,20 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with case e: MissingConfigException => } - props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - - // host missing - try { - PropertyConfigurator.configure(props) - fail("Missing properties exception was expected !") - }catch { - case e: MissingConfigException => - } - - props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - // topic missing - try { - PropertyConfigurator.configure(props) - fail("Missing properties exception was expected !") - }catch { - case e: MissingConfigException => - } - props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) - props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - // serializer missing try { PropertyConfigurator.configure(props) + fail("Missing properties exception was expected !") }catch { - case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") + case e: MissingConfigException => } } @@ -156,15 +121,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with } private def getLog4jConfig: Properties = { - var props = new Properties() + val props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") - props.put("log4j.appender.KAFKA.requiredNumAcks", "1") + props.put("log4j.appender.KAFKA.RequiredNumAcks", "1") + props.put("log4j.appender.KAFKA.SyncSend", "true") + props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") props } }