Liars in PrimitiveApiTest that promise to test api in compression mode, but don't do this actually; patched by Kostya Golikov; reviewed by Guozhang Wang and Jun Rao

This commit is contained in:
Kostya Golikov 2014-02-17 11:43:45 -08:00 committed by Jun Rao
parent dc5a993e67
commit f550cc76cd
1 changed files with 40 additions and 102 deletions

View File

@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils}
* End to end tests of the primitive apis against a local server
*/
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
val port = TestUtils.choosePort
val port = TestUtils.choosePort()
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props)
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
def testProduceAndMultiFetch() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
private def produceAndMultiFetch(producer: Producer[String, String]) {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"))
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
requestHandlerLogger.setLevel(Level.ERROR)
}
def testProduceAndMultiFetchWithCompression() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
{
// send some invalid offsets
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
val request = builder.build()
val response = consumer.fetch(request)
response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
}
{
// send some invalid partitions
val builder = new FetchRequestBuilder()
for( (topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
val request = builder.build()
val response = consumer.fetch(request)
response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: UnknownTopicOrPartitionException => "this is good"
}
}
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
def testProduceAndMultiFetch() {
val props = producer.config.props.props
val config = new ProducerConfig(props)
val noCompressionProducer = new Producer[String, String](config)
produceAndMultiFetch(noCompressionProducer)
}
def testMultiProduce() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
def testProduceAndMultiFetchWithCompression() {
val props = producer.config.props.props
props.put("compression", "true")
val config = new ProducerConfig(props)
val producerWithCompression = new Producer[String, String](config)
produceAndMultiFetch(producerWithCompression)
}
private def multiProduce(producer: Producer[String, String]) {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
for((topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
producer.send(produceList: _*)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
for((topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
def testMultiProduceWithCompression() {
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
producer.send(produceList: _*)
def testMultiProduce() {
val props = producer.config.props.props
val config = new ProducerConfig(props)
val noCompressionProducer = new Producer[String, String](config)
multiProduce(noCompressionProducer)
}
// wait a bit for produced message to be available
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, 0)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
def testMultiProduceWithCompression() {
val props = producer.config.props.props
props.put("compression", "true")
val config = new ProducerConfig(props)
val producerWithCompression = new Producer[String, String](config)
multiProduce(producerWithCompression)
}
def testConsumerEmptyTopic() {
@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
def testPipelinedProduceRequests() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
val props = producer.config.props.props
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
@ -338,10 +276,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
* For testing purposes, just create these topics each with one partition and one replica for
* which the provided broker should the leader for. Create and wait for broker to lead. Simple.
*/
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) {
for( topic <- topics ) {
AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0, timeoutMs = 500)
}
}
}