KAFKA-1366 Multiple Unit Test failures with new producer; reviewed by Neha Narkhede

This commit is contained in:
Guozhang Wang 2014-04-08 10:15:01 -07:00 committed by Neha Narkhede
parent a840c73c36
commit 8f94bc3315
3 changed files with 34 additions and 60 deletions

View File

@ -182,7 +182,10 @@ public class ConfigDef {
if (value instanceof List)
return (List<?>) value;
else if (value instanceof String)
return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
if (trimmed.isEmpty())
return Collections.emptyList();
else
return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
else
throw new ConfigException(name, value, "Expected a comma separated list.");
case CLASS:

View File

@ -26,36 +26,40 @@ import java.util.{Properties, Date}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var topic:String = null
var brokerList:String = null
var compressionCodec:String = null
var topic: String = null
var brokerList: String = null
var compressionType: String = null
var requiredNumAcks: Int = Int.MaxValue
var syncSend: Boolean = false
private var producer: KafkaProducer = null
def getTopic:String = topic
def getTopic: String = topic
def setTopic(topic: String) { this.topic = topic }
def getBrokerList:String = brokerList
def getBrokerList: String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
def getCompressionCodec:String = compressionCodec
def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
def getCompressionType: String = compressionType
def setCompressionType(compressionType: String) { this.compressionType = compressionType }
def getRequiredNumAcks:Int = requiredNumAcks
def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
def getRequiredNumAcks: Int = requiredNumAcks
def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks }
def getSyncSend: Boolean = syncSend
def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend }
override def activateOptions() {
// check for config parameter validity
val props = new Properties()
if(brokerList != null)
props.put("metadata.broker.list", brokerList)
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
if(props.isEmpty)
throw new MissingConfigException("The metadata.broker.list property should be specified")
throw new MissingConfigException("The bootstrap servers property should be specified")
if(topic == null)
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(compressionCodec != null) props.put("compression.codec", compressionCodec)
if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
producer = new KafkaProducer(props)
LogLog.debug("Kafka producer connected to " + brokerList)
LogLog.debug("Logging for topic: " + topic)
@ -64,7 +68,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
override def append(event: LoggingEvent) {
val message = subAppend(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
producer.send(new ProducerRecord(topic, message.getBytes()));
val response = producer.send(new ProducerRecord(topic, message.getBytes()))
if (syncSend) response.get
}
def subAppend(event: LoggingEvent): String = {

View File

@ -55,7 +55,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
config = new KafkaConfig(propsZk)
server = TestUtils.createServer(config);
server = TestUtils.createServer(config)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
}
@ -69,16 +69,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
@Test
def testKafkaLog4jConfigs() {
// host missing
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// port missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
@ -86,54 +85,20 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// host missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// topic missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException =>
}
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// serializer missing
try {
PropertyConfigurator.configure(props)
fail("Missing properties exception was expected !")
}catch {
case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
case e: MissingConfigException =>
}
}
@ -156,15 +121,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
}
private def getLog4jConfig: Properties = {
var props = new Properties()
val props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props.put("log4j.appender.KAFKA.requiredNumAcks", "1")
props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
props.put("log4j.appender.KAFKA.SyncSend", "true")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
props
}
}