Updated consumer tests to pre-populate kafka logs

This commit is contained in:
Geoff Anderson 2015-06-15 19:18:28 -07:00
parent e6a41f1c9a
commit 98b725374c
2 changed files with 21 additions and 3 deletions

View File

@ -102,8 +102,8 @@ class ConsumerPerformanceService(PerformanceService):
parts = last.split(',') parts = last.split(',')
self.results[idx-1] = { self.results[idx-1] = {
'total_mb': float(parts[3]), 'total_mb': float(parts[2]),
'mbps': float(parts[4]), 'mbps': float(parts[3]),
'records_per_sec': float(parts[5]), 'records_per_sec': float(parts[5]),
} }

View File

@ -194,12 +194,21 @@ class Benchmark(KafkaTest):
return data return data
def test_single_consumer(self): def test_single_consumer(self):
topic = "test-rep-three"
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.producer.run()
# All consumer tests use the messages from the first benchmark, so # All consumer tests use the messages from the first benchmark, so
# they'll get messages of the default message size # they'll get messages of the default message size
self.logger.info("BENCHMARK: Single consumer") self.logger.info("BENCHMARK: Single consumer")
self.perf = ConsumerPerformanceService( self.perf = ConsumerPerformanceService(
self.test_context, 1, self.kafka, self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
) )
self.perf.run() self.perf.run()
@ -208,6 +217,15 @@ class Benchmark(KafkaTest):
return data return data
def test_three_consumers(self): def test_three_consumers(self):
topic = "test-rep-three"
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.producer.run()
self.logger.info("BENCHMARK: Three consumers") self.logger.info("BENCHMARK: Three consumers")
self.perf = ConsumerPerformanceService( self.perf = ConsumerPerformanceService(
self.test_context, 3, self.kafka, self.test_context, 3, self.kafka,