| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
					
						
							|  |  |  | # contributor license agreements.  See the NOTICE file distributed with | 
					
						
							|  |  |  | # this work for additional information regarding copyright ownership. | 
					
						
							|  |  |  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
					
						
							|  |  |  | # (the "License"); you may not use this file except in compliance with | 
					
						
							|  |  |  | # the License.  You may obtain a copy of the License at | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | # distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | # See the License for the specific language governing permissions and | 
					
						
							|  |  |  | # limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  | from ducktape.tests.test import Test | 
					
						
							| 
									
										
										
										
											2016-12-12 10:43:23 +08:00
										 |  |  | from ducktape.mark.resource import cluster | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  | from ducktape.mark import parametrize, matrix | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | from kafkatest.tests.kafka_test import KafkaTest | 
					
						
							| 
									
										
										
										
											2016-12-12 10:43:23 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  | from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService | 
					
						
							|  |  |  | from kafkatest.services.zookeeper import ZookeeperService | 
					
						
							|  |  |  | from kafkatest.services.kafka import KafkaService | 
					
						
							|  |  |  | from kafkatest.version import DEV_BRANCH | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  | class StreamsSimpleBenchmarkTest(Test): | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Simple benchmark of Kafka Streams. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, test_context): | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         super(StreamsSimpleBenchmarkTest, self).__init__(test_context) | 
					
						
							| 
									
										
										
										
											2017-08-24 03:53:00 +08:00
										 |  |  |         self.num_records = 10000000L | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         self.replication = 1 | 
					
						
							| 
									
										
										
										
											2017-06-21 18:46:59 +08:00
										 |  |  |         self.num_threads = 1 | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |     @cluster(num_nodes=9) | 
					
						
							| 
									
										
										
										
											2017-06-21 18:46:59 +08:00
										 |  |  |     @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3]) | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |     def test_simple_benchmark(self, test, scale): | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  |         """
 | 
					
						
							|  |  |  |         Run simple Kafka Streams benchmark | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         self.driver = [None] * (scale + 1) | 
					
						
							|  |  |  |         node = [None] * (scale) | 
					
						
							|  |  |  |         data = [None] * (scale) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ############# | 
					
						
							|  |  |  |         # SETUP PHASE | 
					
						
							|  |  |  |         ############# | 
					
						
							|  |  |  |         self.zk = ZookeeperService(self.test_context, num_nodes=1) | 
					
						
							|  |  |  |         self.zk.start() | 
					
						
							|  |  |  |         self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={ | 
					
						
							|  |  |  |             'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'countTopic' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							| 
									
										
										
										
											2017-06-21 18:46:59 +08:00
										 |  |  |             'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication }, | 
					
						
							|  |  |  |             'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication } | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         }) | 
					
						
							| 
									
										
										
										
											2017-06-07 23:52:10 +08:00
										 |  |  |         self.kafka.log_level = "INFO" | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         self.kafka.start() | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  |         ################ | 
					
						
							|  |  |  |         # LOAD PHASE | 
					
						
							|  |  |  |         ################ | 
					
						
							|  |  |  |         self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka, | 
					
						
							| 
									
										
										
										
											2017-06-21 18:46:59 +08:00
										 |  |  |                                                          self.num_records * scale, "true", test, | 
					
						
							|  |  |  |                                                          self.num_threads) | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         self.load_driver.start() | 
					
						
							|  |  |  |         self.load_driver.wait() | 
					
						
							|  |  |  |         self.load_driver.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ################ | 
					
						
							|  |  |  |         # RUN PHASE | 
					
						
							|  |  |  |         ################ | 
					
						
							|  |  |  |         for num in range(0, scale): | 
					
						
							|  |  |  |             self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka, | 
					
						
							| 
									
										
										
										
											2017-06-21 18:46:59 +08:00
										 |  |  |                                                              self.num_records/(scale), "false", test, | 
					
						
							|  |  |  |                                                              self.num_threads) | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |             self.driver[num].start() | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         ####################### | 
					
						
							|  |  |  |         # STOP + COLLECT PHASE | 
					
						
							|  |  |  |         ####################### | 
					
						
							|  |  |  |         for num in range(0, scale):     | 
					
						
							|  |  |  |             self.driver[num].wait()     | 
					
						
							|  |  |  |             self.driver[num].stop() | 
					
						
							|  |  |  |             node[num] = self.driver[num].node | 
					
						
							|  |  |  |             node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False) | 
					
						
							|  |  |  |             data[num] = self.driver[num].collect_data(node[num], "" ) | 
					
						
							|  |  |  |                  | 
					
						
							| 
									
										
										
										
											2016-07-19 03:16:58 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-09 05:06:09 +08:00
										 |  |  |         final = {} | 
					
						
							|  |  |  |         for num in range(0, scale): | 
					
						
							|  |  |  |             for key in data[num]: | 
					
						
							|  |  |  |                 final[key + str(num)] = data[num][key] | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         return final |