mirror of https://github.com/apache/kafka.git
Updated consumer tests to pre-populate kafka logs
This commit is contained in:
parent
e6a41f1c9a
commit
98b725374c
|
@ -102,8 +102,8 @@ class ConsumerPerformanceService(PerformanceService):
|
||||||
parts = last.split(',')
|
parts = last.split(',')
|
||||||
|
|
||||||
self.results[idx-1] = {
|
self.results[idx-1] = {
|
||||||
'total_mb': float(parts[3]),
|
'total_mb': float(parts[2]),
|
||||||
'mbps': float(parts[4]),
|
'mbps': float(parts[3]),
|
||||||
'records_per_sec': float(parts[5]),
|
'records_per_sec': float(parts[5]),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -194,12 +194,21 @@ class Benchmark(KafkaTest):
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def test_single_consumer(self):
|
def test_single_consumer(self):
|
||||||
|
topic = "test-rep-three"
|
||||||
|
|
||||||
|
self.producer = ProducerPerformanceService(
|
||||||
|
self.test_context, 1, self.kafka,
|
||||||
|
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||||
|
)
|
||||||
|
self.producer.run()
|
||||||
|
|
||||||
# All consumer tests use the messages from the first benchmark, so
|
# All consumer tests use the messages from the first benchmark, so
|
||||||
# they'll get messages of the default message size
|
# they'll get messages of the default message size
|
||||||
self.logger.info("BENCHMARK: Single consumer")
|
self.logger.info("BENCHMARK: Single consumer")
|
||||||
self.perf = ConsumerPerformanceService(
|
self.perf = ConsumerPerformanceService(
|
||||||
self.test_context, 1, self.kafka,
|
self.test_context, 1, self.kafka,
|
||||||
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
|
topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
|
||||||
)
|
)
|
||||||
self.perf.run()
|
self.perf.run()
|
||||||
|
|
||||||
|
@ -208,6 +217,15 @@ class Benchmark(KafkaTest):
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def test_three_consumers(self):
|
def test_three_consumers(self):
|
||||||
|
topic = "test-rep-three"
|
||||||
|
|
||||||
|
self.producer = ProducerPerformanceService(
|
||||||
|
self.test_context, 1, self.kafka,
|
||||||
|
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||||
|
)
|
||||||
|
self.producer.run()
|
||||||
|
|
||||||
self.logger.info("BENCHMARK: Three consumers")
|
self.logger.info("BENCHMARK: Three consumers")
|
||||||
self.perf = ConsumerPerformanceService(
|
self.perf = ConsumerPerformanceService(
|
||||||
self.test_context, 3, self.kafka,
|
self.test_context, 3, self.kafka,
|
||||||
|
|
Loading…
Reference in New Issue