KAFKA-440 Regression/system test framework; patched by John Fung; reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1376147 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-08-22 17:16:26 +00:00
parent 663d254433
commit 5861046d09
21 changed files with 2073 additions and 0 deletions

View File

@ -28,3 +28,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
#log4j.logger.kafka=INFO
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG

80
system_test/README.txt Normal file
View File

@ -0,0 +1,80 @@
# ==========================
# Known Issues:
# ==========================
1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
2. Sometimes the running processes may not be terminated properly by the script.
# ==========================
# Overview
# ==========================
"system_test" is now transformed to a system regression test framework intended for the automation of system / integration testing of data platform software such as Kafka. The test framework is implemented in Python which is a popular scripting language with well supported features.
The framework has the following levels:
1. The first level is generic and does not depend on any product specific details.
location: system_test
a. system_test_runner.py - It implements the main class RegTest as an entry point.
b. system_test_env.py - It implements the class RegTestEnv which defines the testing environment of a test session such as the base directory and environment variables specific to the local machine.
2. The second level defines a suite of testing such as Kafka's replication (including basic testing, failure testing, ... etc)
location: system_test/<suite directory name>*.
* Please note the test framework will look for a specific suffix of the directories under system_test to determine what test suites are available. The suffix of <suite directory name> can be defined in SystemTestEnv class (system_test_env.py)
a. replica_basic_test.py - This is a test module file. It implements the test logic for basic replication testing as follows:
i. start zookeepers
ii. start brokers
iii. create kafka topics
iv. lookup the brokerid as a leader
v. terminate the leader (if defined in the testcase config json file)
vi. start producer to send n messages
vii. start consumer to receive messages
viii. validate if there is data loss
b. config/ - This config directory provides templates for all properties files needed for zookeeper, brokers, producer and consumer (any changes in the files under this directory would be reflected or overwritten by the settings under testcase_<n>/testcase_<n>_properties.json)
d. testcase_<n>** - The testcase directory contains the testcase argument definition file: testcase_1_properties.json. This file defines the specific configurations for the testcase such as the followings (eg. producer related):
i. no. of producer threads
ii. no. of messages to produce
iii. zkconnect string
When this test case is being run, the test framework will copy and update the template properties files to testcase_<n>/config. The logs of various components will be saved in testcase_<n>/logs
** Please note the test framework will look for a specific prefix of the directories under system_test/<test suite dir>/ to determine what test cases are available. The prefix of <testcase directory name> can be defined in SystemTestEnv class (system_test_env.py)
# ==========================
# Quick Start
# ==========================
* Please note that the following commands should be executed after downloading the kafka source code to build all the required binaries:
1. <kafka install dir>/ $ ./sbt update package
Now you are ready to follow the steps below.
1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment
2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
3. To run the test, go to <kafka_home>/system_test and run the following command:
$ python -B system_test_runner.py
# ==========================
# Adding Test Case
# ==========================
To create a new test suite called "broker_testsuite", please do the followings:
1. Copy and paste system_test/replication_testsuite => system_test/broker_testsuite
2. Rename system_test/broker_testsuite/replica_basic_test.py => system_test/broker_testsuite/broker_basic_test.py
3. Edit system_test/broker_testsuite/broker_basic_test.py and update all ReplicaBasicTest related class name to BrokerBasicTest (as an example)
4. Follow the flow of system_test/broker_testsuite/broker_basic_test.py and modify the necessary test logic accordingly.
To create a new test case under "replication_testsuite", please do the followings:
1. Copy and paste system_test/replication_testsuite/testcase_1 => system_test/replication_testsuite/testcase_2
2. Rename system_test/replication_testsuite/testcase_2/testcase_1_properties.json => system_test/replication_testsuite/testcase_2/testcase_2_properties.json
3. Update system_test/replication_testsuite/testcase_2/testcase_2_properties.json with the corresponding settings for testcase 2.

1
system_test/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1,52 @@
{
"cluster_config": [
{
"entity_id": "0",
"hostname": "localhost",
"role": "zookeeper",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9990"
},
{
"entity_id": "1",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9991"
},
{
"entity_id": "2",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9992"
},
{
"entity_id": "3",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9993"
},
{
"entity_id": "4",
"hostname": "localhost",
"role": "producer_performance",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9994"
},
{
"entity_id": "5",
"hostname": "localhost",
"role": "console_consumer",
"kafka_home": "/home/nnarkhed/Projects/kafka-440",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9995"
}
]
}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,4 @@
zookeeper=local:2181
topic=test_1
from-beginning
consumer-timeout-ms=10000

View File

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000

View File

@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
#log4j.appender.fileAppender=org.apache.log4j.FileAppender
#log4j.appender.fileAppender.File=kafka-request.log
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
# Turn on all our debugging info
#log4j.logger.kafka=INFO
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG

View File

@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
# need to set either broker.list or zk.connect
# configure brokers statically
# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
#broker.list=0:localhost:9092
# discover brokers from ZK
zk.connect=localhost:2181
# zookeeper session timeout; default is 6000
#zk.sessiontimeout.ms=
# the max time that the client waits to establish a connection to zookeeper; default is 6000
#zk.connectiontimeout.ms
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: 0: no compression, 1: gzip
compression.codec=0
# message encoder
serializer.class=kafka.serializer.StringEncoder
# allow topic level compression
#compressed.topics=
# max message size; messages larger than that size are discarded; default is 1000000
#max.message.size=
############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.time=
# the maximum size of the blocking queue for buffering on the producer
#queue.size=
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueueTimeout.ms=
# the number of messages batched at the producer
#batch.size=
# the callback handler for one or multiple events
#callback.handler=
# properties required to initialize the callback handler
#callback.handler.props=
# the handler for events
#event.handler=
# properties required to initialize the event handler
#event.handler.props=

View File

@ -0,0 +1,7 @@
broker-list=localhost:2181
topic=mytest
messages=200
message-size=100
thread=5
initial-message-id=0
compression-codec=0

View File

@ -0,0 +1,120 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
#hostname=
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9091
# The number of threads handling network requests
network.threads=2
# The number of threads doing disk I/O
io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
############################# Log Basics #############################
# The directory under which to store log files
log.dir=/tmp/kafka_server_logs
# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=5
# Overrides for for the default given by num.partitions on a per-topic basis
#topic.partition.count.map=topic1:3, topic2:4
############################# Log Flush Policy #############################
# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
# 3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.file.size=536870912
#log.file.size=102400
log.file.size=128
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1
############################# Zookeeper #############################
# Enable connecting to zookeeper
enable.zookeeper=true
# Zk connection string (see zk docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

View File

@ -0,0 +1,256 @@
#!/usr/bin/env python
# ===================================
# replica_basic_test.py
# ===================================
import inspect
import logging
import os
import signal
import subprocess
import sys
import time
from system_test_env import SystemTestEnv
sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
from setup_utils import SetupUtils
import system_test_utils
from testcase_env import TestcaseEnv
# product specific: Kafka
import kafka_system_test_utils
class ReplicaBasicTest(SetupUtils):
testModuleAbsPathName = os.path.realpath(__file__)
testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName))
isLeaderLogPattern = "Completed the leader state transition"
def __init__(self, systemTestEnv):
# SystemTestEnv - provides cluster level environment settings
# such as entity_id, hostname, kafka_home, java_home which
# are available in a list of dictionary named
# "clusterEntityConfigDictList"
self.systemTestEnv = systemTestEnv
# dict to pass user-defined attributes to logger argument: "extra"
d = {'name_of_class': self.__class__.__name__}
def signal_handler(self, signal, frame):
self.log_message("Interrupt detected - User pressed Ctrl+c")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, self.testcaseEnv, entityId, parentPid)
sys.exit(1)
def runTest(self):
# get all testcase directories under this testsuite
testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
testCasePathNameList.sort()
# =============================================================
# launch each testcase one by one: testcase_1, testcase_2, ...
# =============================================================
for testCasePathName in testCasePathNameList:
try:
# create a new instance of TestcaseEnv to keep track of this testcase's environment variables
self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
# initialize self.testcaseEnv with user-defined environment
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \
ReplicaBasicTest.isLeaderLogPattern
self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"] = \
"\[(.*?)\] .* Broker (.*?): " + \
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
" for topic (.*?) partition (.*?) \(.*"
self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
# find testcase properties json file
testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
self.logger.debug("testcasePropJsonPathName : " + testcasePropJsonPathName, extra=self.d)
# get the dictionary that contains the testcase arguments and description
testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(testcasePropJsonPathName)
testcaseDirName = os.path.basename(testCasePathName)
self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
#### => update testcaseEnv
self.testcaseEnv.testCaseBaseDir = testCasePathName
self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
# get testcase description
testcaseDescription = ""
for k,v in testcaseNonEntityDataDict.items():
if ( k == "description" ): testcaseDescription = v
#### => update testcaseEnv
# TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
# "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
# =================================================================
# TestcaseEnv environment settings initialization are completed here
# =================================================================
# self.testcaseEnv.systemTestBaseDir
# self.testcaseEnv.testSuiteBaseDir
# self.testcaseEnv.testCaseBaseDir
# self.testcaseEnv.testCaseLogsDir
# self.testcaseEnv.testcaseArgumentsDict
# display testcase name and arguments
self.log_message("Test Case : " + testcaseDirName)
for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
self.anonLogger.info(" " + k + " : " + v)
self.log_message("Description : " + testcaseDescription)
# ================================================================ #
# ================================================================ #
# Product Specific Testing Code Starts Here: #
# ================================================================ #
# ================================================================ #
# initialize signal handler
signal.signal(signal.SIGINT, self.signal_handler)
# create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
# for collecting logs from remote machines
kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
# TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
# system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(testcasePropJsonPathName)
# TestcaseEnv - initialize producer & consumer config / log file pathnames
kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
# clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
# generate remote hosts log/config dirs if not exist
kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
# generate properties files for zookeeper, kafka, producer, consumer:
# 1. copy system_test/<suite_name>_testsuite/config/*.properties to
# system_test/<suite_name>_testsuite/testcase_<n>/config/
# 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
# by overriding the settings specified in:
# system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, self.testcaseEnv, self.systemTestEnv)
# =============================================
# preparing all entities to start the test
# =============================================
self.log_message("starting zookeepers")
kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 2s")
time.sleep(2)
self.log_message("starting brokers")
kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
self.log_message("creating topics")
kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
self.log_message("looking up leader")
leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv)
# ==========================
# leaderDict looks like this:
# ==========================
#{'entity_id': u'3',
# 'partition': '0',
# 'timestamp': 1345050255.8280001,
# 'hostname': u'localhost',
# 'topic': 'test_1',
# 'brokerid': '3'}
# validate to see if leader election is successful
self.log_message("validating leader election")
result = kafka_system_test_utils.validate_leader_election_successful( \
self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
# checking to see if leader bouncing is required in this testcase
bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
self.log_message("bounce_leader flag : " + bounceLeaderFlag)
if (bounceLeaderFlag.lower() == "true"):
if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
# no leader available for testing => skip this round
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
continue
else:
# leader elected => stop leader
try:
leaderEntityId = leaderDict["entity_id"]
leaderBrokerId = leaderDict["brokerid"]
leaderPPid = self.testcaseEnv.entityParentPidDict[leaderEntityId]
except:
self.log_message("leader details unavailable")
self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid)
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid)
self.testcaseEnv.entityParentPidDict[leaderEntityId] = ""
self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d)
time.sleep(5)
# starting producer
self.log_message("starting producer")
kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
# starting previously terminated broker
if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
self.log_message("starting the previously terminated broker")
stoppedLeaderEntityId = leaderDict["entity_id"]
kafka_system_test_utils.start_entity_in_background(
self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
# starting consumer
self.log_message("starting consumer")
kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
# this testcase is completed - so stopping all entities
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
# validate the data matched
self.log_message("validating data matched")
result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
# =============================================
# collect logs from remote hosts
# =============================================
kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
except Exception as e:
self.log_message("Exception caught : ")
print e
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)

View File

@ -0,0 +1,63 @@
{
"description": "Basic test to produce and consume messages to a single topic partition. This test sends messages to n replicas and at the end verifies the log size and contents as well as using a consumer to verify no message loss. Optionally, the test bounces the leader periodically to introduce failures during the message replication.",
"testcase_args": {
"bounce_leader": "true",
"replica_factor": "3",
"num_partition": "2"
},
"entities": [
{
"entity_id": "0",
"clientPort": "2188",
"dataDir": "/tmp/zookeeper_0",
"log_filename": "zookeeper_2188.log",
"config_filename": "zookeeper_2188.properties"
},
{
"entity_id": "1",
"port": "9091",
"brokerid": "1",
"log.file.size": "10240",
"log.dir": "/tmp/kafka_server_1_logs",
"log_filename": "kafka_server_9091.log",
"config_filename": "kafka_server_9091.properties"
},
{
"entity_id": "2",
"port": "9092",
"brokerid": "2",
"log.file.size": "10240",
"log.dir": "/tmp/kafka_server_2_logs",
"log_filename": "kafka_server_9092.log",
"config_filename": "kafka_server_9092.properties"
},
{
"entity_id": "3",
"port": "9093",
"brokerid": "3",
"log.file.size": "10240",
"log.dir": "/tmp/kafka_server_3_logs",
"log_filename": "kafka_server_9093.log",
"config_filename": "kafka_server_9093.properties"
},
{
"entity_id": "4",
"topic": "test_1",
"threads": "5",
"compression-codec": "1",
"message-size": "500",
"message": "500",
"broker-list": "localhost:9091,localhost:9092,localhost:9093",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
{
"entity_id": "5",
"topic": "test_1",
"groupid": "mytestgroup",
"consumer-timeout-ms": "10000",
"log_filename": "console_consumer.log",
"config_filename": "console_consumer.properties"
}
]
}

View File

@ -0,0 +1,56 @@
#!/usr/bin/env python
# ===================================
# system_test_env.py
# ===================================
import json
import os
import sys
class SystemTestEnv():
# private:
_cwdFullPath = os.getcwd()
_thisScriptFullPathName = os.path.realpath(__file__)
_thisScriptBaseDir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0])))
# public:
SYSTEM_TEST_BASE_DIR = os.path.abspath(_thisScriptBaseDir)
SYSTEM_TEST_UTIL_DIR = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/utils")
SYSTEM_TEST_SUITE_SUFFIX = "_testsuite"
SYSTEM_TEST_CASE_PREFIX = "testcase_"
SYSTEM_TEST_MODULE_EXT = ".py"
CLUSTER_CONFIG_FILENAME = "cluster_config.json"
CLUSTER_CONFIG_PATHNAME = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + CLUSTER_CONFIG_FILENAME)
clusterEntityConfigDictList = []
systemTestResultsList = []
def __init__(self):
"Create an object with this system test session environment"
# retrieve each entity's data from cluster config json file
# as "dict" and enter them into a "list"
jsonFileContent = open(self.CLUSTER_CONFIG_PATHNAME, "r").read()
jsonData = json.loads(jsonFileContent)
for key, cfgList in jsonData.items():
if key == "cluster_config":
for cfg in cfgList:
self.clusterEntityConfigDictList.append(cfg)
def getSystemTestEnvDict(self):
envDict = {}
envDict["system_test_base_dir"] = self.SYSTEM_TEST_BASE_DIR
envDict["system_test_util_dir"] = self.SYSTEM_TEST_UTIL_DIR
envDict["cluster_config_pathname"] = self.CLUSTER_CONFIG_PATHNAME
envDict["system_test_suite_suffix"] = self.SYSTEM_TEST_SUITE_SUFFIX
envDict["system_test_case_prefix"] = self.SYSTEM_TEST_CASE_PREFIX
envDict["system_test_module_ext"] = self.SYSTEM_TEST_MODULE_EXT
envDict["cluster_config_pathname"] = self.CLUSTER_CONFIG_PATHNAME
envDict["cluster_entity_config_dict_list"] = self.clusterEntityConfigDictList
envDict["system_test_results_list"] = self.systemTestResultsList
return envDict

View File

@ -0,0 +1,164 @@
#!/usr/bin/evn python
# ===================================
# system_test_runner.py
# ===================================
from system_test_env import SystemTestEnv
from utils import system_test_utils
import logging
import os
import sys
# ====================================================================
# Two logging formats are defined in system_test/system_test_runner.py
# ====================================================================
# 1. "namedLogger" is defined to log message in this format:
# "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s"
#
# usage: to log message and showing the class name of the message
namedLogger = logging.getLogger("namedLogger")
namedLogger.setLevel(logging.INFO)
#namedLogger.setLevel(logging.DEBUG)
namedConsoleHandler = logging.StreamHandler()
namedConsoleHandler.setLevel(logging.DEBUG)
namedFormatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s")
namedConsoleHandler.setFormatter(namedFormatter)
namedLogger.addHandler(namedConsoleHandler)
# 2. "anonymousLogger" is defined to log message in this format:
# "%(asctime)s - %(levelname)s - %(message)s"
#
# usage: to log message without showing class name and it's appropriate
# for logging generic message such as "sleeping for 5 seconds"
anonymousLogger = logging.getLogger("anonymousLogger")
anonymousLogger.setLevel(logging.INFO)
#anonymousLogger.setLevel(logging.DEBUG)
anonymousConsoleHandler = logging.StreamHandler()
anonymousConsoleHandler.setLevel(logging.DEBUG)
anonymousFormatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
anonymousConsoleHandler.setFormatter(anonymousFormatter)
anonymousLogger.addHandler(anonymousConsoleHandler)
d = {'name_of_class': '(system_test_runner)'}
def main():
print
print
print
anonymousLogger.info("=================================================")
anonymousLogger.info(" System Regression Test Framework")
anonymousLogger.info("=================================================")
print
print
testSuiteClassDictList = []
# SystemTestEnv is a class to provide all environement settings for this session
# such as the SYSTEM_TEST_BASE_DIR, SYSTEM_TEST_UTIL_DIR, ...
systemTestEnv = SystemTestEnv()
# sanity check on remote hosts to make sure:
# - all directories (eg. java_home) specified in cluster_config.json exists in all hosts
# - no conflicting running processes in remote hosts
anonymousLogger.info("=================================================")
anonymousLogger.info("setting up remote hosts ...")
anonymousLogger.info("=================================================")
if not system_test_utils.setup_remote_hosts(systemTestEnv):
namedLogger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
print
sys.exit(1)
print
# get all defined names within a module:
definedItemList = dir(SystemTestEnv)
anonymousLogger.debug("=================================================")
anonymousLogger.debug("SystemTestEnv keys:")
for item in definedItemList:
anonymousLogger.debug(" " + item)
anonymousLogger.debug("=================================================")
anonymousLogger.info("=================================================")
anonymousLogger.info("looking up test suites ...")
anonymousLogger.info("=================================================")
# find all test suites in SYSTEM_TEST_BASE_DIR
for dirName in os.listdir(systemTestEnv.SYSTEM_TEST_BASE_DIR):
# make sure this is a valid testsuite directory
if os.path.isdir(dirName) and dirName.endswith(systemTestEnv.SYSTEM_TEST_SUITE_SUFFIX):
namedLogger.info("found a testsuite : " + dirName, extra=d)
testModulePathName = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + dirName)
# go through all test modules file in this testsuite
for moduleFileName in os.listdir(testModulePathName):
# make sure it is a valid test module
if moduleFileName.endswith(systemTestEnv.SYSTEM_TEST_MODULE_EXT) \
and not moduleFileName.startswith("__"):
# found a test module file
namedLogger.info("found a test module file : " + moduleFileName, extra=d)
testModuleClassName = system_test_utils.sys_call("grep ^class " + testModulePathName + "/" + \
moduleFileName + " | sed 's/^class //g' | sed 's/(.*):.*//g'")
testModuleClassName = testModuleClassName.rstrip('\n')
# collect the test suite class data
testSuiteClassDict = {}
testSuiteClassDict["suite"] = dirName
extLenToRemove = systemTestEnv.SYSTEM_TEST_MODULE_EXT.__len__() * -1
testSuiteClassDict["module"] = moduleFileName[:extLenToRemove]
testSuiteClassDict["class"] = testModuleClassName
testSuiteClassDictList.append(testSuiteClassDict)
# loop through testSuiteClassDictList and start the test class one by one
for testSuiteClassDict in testSuiteClassDictList:
suiteName = testSuiteClassDict["suite"]
moduleName = testSuiteClassDict["module"]
className = testSuiteClassDict["class"]
# add testsuite directory to sys.path such that the module can be loaded
sys.path.append(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + suiteName)
anonymousLogger.info("=================================================")
anonymousLogger.info("Running Test for : ")
anonymousLogger.info(" suite : " + suiteName)
anonymousLogger.info(" module : " + moduleName)
anonymousLogger.info(" class : " + className)
anonymousLogger.info("=================================================")
# dynamically loading a module and starting the test class
mod = __import__(moduleName)
theClass = getattr(mod, className)
instance = theClass(systemTestEnv)
instance.runTest()
print
anonymousLogger.info("=================================================")
anonymousLogger.info(" TEST REPORTS")
anonymousLogger.info("=================================================")
for systemTestResult in systemTestEnv.systemTestResultsList:
for key,val in systemTestResult.items():
if key == "validation_status":
anonymousLogger.info(key + " : ")
for validation, status in val.items():
anonymousLogger.info(" " + validation + " : " + status)
else:
anonymousLogger.info(key + " : " + val)
print
# =========================
# main entry point
# =========================
main()

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,681 @@
#!/usr/bin/env python
# ===================================
# kafka_system_test_utils.py
# ===================================
import datetime
import inspect
import json
import logging
import os
import re
import subprocess
import sys
import time
import traceback
import system_test_utils
from datetime import datetime
from time import mktime
# ====================================================================
# Two logging formats are defined in system_test/system_test_runner.py
# ====================================================================
# 1. "namedLogger" is defined to log message in this format:
# "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s"
# usage: to log message and showing the class name of the message
logger = logging.getLogger("namedLogger")
thisClassName = '(kafka_system_test_utils)'
d = {'name_of_class': thisClassName}
# 2. "anonymousLogger" is defined to log message in this format:
# "%(asctime)s - %(levelname)s - %(message)s"
# usage: to log message without showing class name and it's appropriate
# for logging generic message such as "sleeping for 5 seconds"
anonLogger = logging.getLogger("anonymousLogger")
# =====================================
# Sample usage of getting testcase env
# =====================================
def get_testcase_env(testcaseEnv):
anonLogger.info("================================================")
anonLogger.info("systemTestBaseDir : " + testcaseEnv.systemTestBaseDir)
anonLogger.info("testSuiteBaseDir : " + testcaseEnv.testSuiteBaseDir)
anonLogger.info("testCaseBaseDir : " + testcaseEnv.testCaseBaseDir)
anonLogger.info("testCaseLogsDir : " + testcaseEnv.testCaseLogsDir)
anonLogger.info("userDefinedEnvVarDict : (testcaseEnv.userDefinedEnvVarDict)")
anonLogger.info("================================================")
def get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, type):
defaultLogDir = testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
# type is either "metrics" or "dashboards" or "default"
if type == "metrics":
return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics"
elif type == "default" :
return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
elif type == "dashboards":
return testcaseEnv.testCaseLogsDir + "/dashboards"
elif type == "config":
return testcaseEnv.testCaseBaseDir + "/config"
else:
logger.error("unrecognized log directory type : " + type, extra=d)
logger.error("returning default log dir : " + defaultLogDir, extra=d)
return defaultLogDir
def generate_testcase_log_dirs(systemTestEnv, testcaseEnv):
testcasePathName = testcaseEnv.testCaseBaseDir
logger.debug("testcase pathname: " + testcasePathName, extra=d)
if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
if not os.path.exists(testcasePathName + "/logs") : os.makedirs(testcasePathName + "/logs")
dashboardsPathName = testcaseEnv.testCaseLogsDir + "/dashboards"
if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
entityId = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
logger.debug("entity_id : " + entityId, extra=d)
logger.debug("role : " + role, extra=d)
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
anonLogger.info("================================================")
anonLogger.info("collecting logs from remote machines")
anonLogger.info("================================================")
testCaseBaseDir = testcaseEnv.testCaseBaseDir
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
entity_id = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
logger.debug("entity_id : " + entity_id, extra=d)
logger.debug("hostname : " + hostname, extra=d)
logger.debug("role : " + role, extra=d)
configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
cmdList = ["scp",
hostname + ":" + metricsPathName + "/*",
metricsPathName]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
testCaseBaseDir = testcaseEnv.testCaseBaseDir
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
entity_id = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
logger.debug("entity_id : " + entity_id, extra=d)
logger.debug("hostname : " + hostname, extra=d)
logger.debug("role : " + role, extra=d)
configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
cmdList = ["ssh " + hostname,
"'mkdir -p",
configPathName,
metricsPathName,
dashboardsPathName + "'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def init_entity_props(systemTestEnv, testcaseEnv):
clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
testcaseConfigsList = testcaseEnv.testcaseConfigsList
testcasePathName = testcaseEnv.testCaseBaseDir
# consumer config / log files location
consEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterConfigsList, "role", "console_consumer", "entity_id")
consLogList = system_test_utils.get_data_from_list_of_dicts( \
testcaseConfigsList, "entity_id", consEntityIdList[0], "log_filename")
consLogPathname = testcasePathName + "/logs/" + consLogList[0]
consCfgList = system_test_utils.get_data_from_list_of_dicts( \
testcaseConfigsList, "entity_id", consEntityIdList[0], "config_filename")
consCfgPathname = testcasePathName + "/config/" + consCfgList[0]
# producer config / log files location
prodEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterConfigsList, "role", "producer_performance", "entity_id")
prodLogList = system_test_utils.get_data_from_list_of_dicts( \
testcaseConfigsList, "entity_id", prodEntityIdList[0], "log_filename")
prodLogPathname = testcasePathName + "/logs/" + prodLogList[0]
prodCfgList = system_test_utils.get_data_from_list_of_dicts( \
testcaseConfigsList, "entity_id", prodEntityIdList[0], "config_filename")
prodCfgPathname = testcasePathName + "/config/" + prodCfgList[0]
testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consLogPathname
testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"] = consCfgPathname
testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = prodLogPathname
testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"] = prodCfgPathname
def copy_file_with_dict_values(srcFile, destFile, dictObj):
infile = open(srcFile, "r")
inlines = infile.readlines()
infile.close()
outfile = open(destFile, 'w')
for line in inlines:
for key in dictObj.keys():
if (line.startswith(key + "=")):
line = key + "=" + dictObj[key] + "\n"
outfile.write(line)
outfile.close()
def start_metrics_collection(jmxHost, jmxPort, mBeanObjectName, mBeanAttributes, entityId, clusterEntityConfigDictList, testcaseEnv):
logger.info("starting metrics collection on jmx port: " + jmxPort, extra=d)
jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics")
startMetricsCmdList = ["ssh " + jmxHost,
"'JAVA_HOME=" + javaHome,
"JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
"--jmx-url " + jmxUrl,
"--object-name " + mBeanObjectName + " &> ",
metricsPathName + "/metrics.csv & echo pid:$! > ",
metricsPathName + "/entity_pid'"]
startMetricsCommand = " ".join(startMetricsCmdList)
logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
system_test_utils.async_sys_call(startMetricsCommand)
pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of the remote entity pid in a dictionary
for line in subproc.stdout.readlines():
if line.startswith("pid"):
line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
thisPid = tokens[1]
testcaseEnv.entityParentPidDict[thisPid] = thisPid
def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
logger.info("calling generate_properties_files", extra=d)
clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
tcPathname = testcaseEnv.testCaseBaseDir
tcConfigsList = testcaseEnv.testcaseConfigsList
cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config")
cfgDestPathname = os.path.abspath(tcPathname + "/config")
logger.info("config template (source) pathname : " + cfgTemplatePathname, extra=d)
logger.info("testcase config (dest) pathname : " + cfgDestPathname, extra=d)
# loop through all zookeepers (if more than 1) to retrieve host and clientPort
# to construct a zk.connect str for broker in the form of:
# zk.connect=<host1>:<port2>,<host2>:<port2>
zkConnectStr = ""
zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
for zkDict in zkDictList:
entityID = zkDict["entity_id"]
hostname = zkDict["hostname"]
clientPortList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "clientPort")
clientPort = clientPortList[0]
if ( zkConnectStr.__len__() == 0 ):
zkConnectStr = hostname + ":" + clientPort
else:
zkConnectStr = zkConnectStr + "," + hostname + ":" + clientPort
# for each entity in the cluster config
for clusterCfg in clusterConfigsList:
cl_entity_id = clusterCfg["entity_id"]
for tcCfg in tcConfigsList:
if (tcCfg["entity_id"] == cl_entity_id):
# copy the associated .properties template, update values, write to testcase_<xxx>/config
if ( clusterCfg["role"] == "broker" ):
tcCfg["zk.connect"] = zkConnectStr
copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
elif ( clusterCfg["role"] == "zookeeper"):
copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
elif ( clusterCfg["role"] == "producer_performance"):
tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
elif ( clusterCfg["role"] == "console_consumer"):
tcCfg["zookeeper"] = zkConnectStr
copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
else:
print " => ", tcCfg
print "UNHANDLED key"
# scp updated config files to remote hosts
scp_file_to_remote_host(clusterConfigsList, testcaseEnv)
def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv):
testcaseConfigsList = testcaseEnv.testcaseConfigsList
for clusterEntityConfigDict in clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
testcasePathName = testcaseEnv.testCaseBaseDir
cmdStr = "scp " + testcasePathName + "/config/* " + hostname + ":" + testcasePathName + "/config"
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def start_zookeepers(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
for zkEntityId in zkEntityIdList:
start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId)
def start_brokers(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "broker", "entity_id")
for brokerEntityId in brokerEntityIdList:
start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
logger.info("looking up leader...", extra=d)
# keep track of leader related data in this dict such as broker id,
# entity id and timestamp and return it to the caller function
leaderDict = {}
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "broker", "entity_id")
for brokerEntityId in brokerEntityIdList:
hostname = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
logFile = system_test_utils.get_data_by_lookup_keyval( \
testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
leaderDict["entity_id"] = brokerEntityId
leaderDict["hostname"] = hostname
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
cmdStrList = ["ssh " + hostname,
"\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
logPathName + "/" + logFile + " | ",
"sort | tail -1\""]
cmdStr = " ".join(cmdStrList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
line = line.rstrip('\n')
if testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
logger.info("found the log line : " + line, extra=d)
try:
matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"], line)
datetimeStr = matchObj.group(1)
datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
#print "{0:.3f}".format(unixTs)
leaderDict["timestamp"] = unixTs
leaderDict["brokerid"] = matchObj.group(2)
leaderDict["topic"] = matchObj.group(3)
leaderDict["partition"] = matchObj.group(4)
except:
logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
#else:
# logger.debug("unmatched line found [" + line + "]", extra=d)
return leaderDict
def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
# cluster configurations:
hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
role = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role")
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
# testcase configurations:
testcaseConfigsList = testcaseEnv.testcaseConfigsList
clientPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "clientPort")
configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename")
logFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d)
configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config")
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default")
if role == "zookeeper":
cmdList = ["ssh " + hostname,
"'JAVA_HOME=" + javaHome,
kafkaHome + "/bin/zookeeper-server-start.sh ",
configPathName + "/" + configFile + " &> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
# construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "," + hostname + ":" + clientPort
else:
testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = hostname + ":" + clientPort
elif role == "broker":
cmdList = ["ssh " + hostname,
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
configPathName + "/" + configFile + " &> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
# it seems to be a better idea to launch producer & consumer in separate functions
# elif role == "producer_performance":
# elif role == "console_consumer":
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
system_test_utils.async_sys_call(cmdStr)
time.sleep(5)
pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of the remote entity pid in a dictionary
for line in subproc.stdout.readlines():
if line.startswith("pid"):
line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
testcaseEnv.entityParentPidDict[entityId] = tokens[1]
# if it is a broker, start metric collection
if role == "broker":
start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \
"AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv)
def start_console_consumer(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
for consumerConfig in consumerConfigList:
host = consumerConfig["hostname"]
entityId = consumerConfig["entity_id"]
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting console consumer", extra=d)
consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \
"/console_consumer.log"
testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
commandArgs + " &> " + consumerLogPathName + "'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def start_producer_performance(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "producer_performance")
for producerConfig in producerConfigList:
host = producerConfig["hostname"]
entityId = producerConfig["entity_id"]
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting producer preformance", extra=d)
producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \
"/producer_performance.log"
testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
kafkaRunClassBin + " kafka.perf.ProducerPerformance",
commandArgs + " &> " + producerLogPathName + "'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def stop_remote_entity(systemTestEnv, entityId, parentPid):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
system_test_utils.sigterm_remote_process(hostname, pidStack)
time.sleep(1)
system_test_utils.sigkill_remote_process(hostname, pidStack)
def create_topic(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "producer_performance")
prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \
testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
prodTopicList = prodPerfCfgDict[0]["topic"].split(',')
zkEntityId = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
zkHost = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "zookeeper", "hostname")
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh"
logger.info("zkEntityId : " + zkEntityId, extra=d)
logger.info("createTopicBin : " + createTopicBin, extra=d)
for topic in prodTopicList:
logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "]", extra=d)
cmdList = ["ssh " + zkHost,
"'JAVA_HOME=" + javaHome,
createTopicBin,
" --topic " + topic,
" --zookeeper " + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"],
" --replica " + testcaseEnv.testcaseArgumentsDict["replica_factor"],
" --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " &> ",
testcaseEnv.testCaseBaseDir + "/logs/create_topic.log'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
def get_message_id(logPathName):
logLines = open(logPathName, "r").readlines()
messageIdList = []
for line in logLines:
if not "MessageID" in line:
continue
else:
matchObj = re.match('.*MessageID:(.*?):', line)
messageIdList.append( matchObj.group(1) )
return messageIdList
def validate_data_matched(systemTestEnv, testcaseEnv):
validationStatusDict = testcaseEnv.validationStatusDict
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "producer_performance", "entity_id")
consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
testcaseEnv, "console_consumer", consumerEntityId, "default") + \
"/msg_id_missing_in_consumer.log"
producerMsgIdList = get_message_id(testcaseEnv.userDefinedEnvVarDict["producerLogPathName"])
consumerMsgIdList = get_message_id(testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"])
producerMsgIdSet = set(producerMsgIdList)
consumerMsgIdSet = set(consumerMsgIdList)
missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
outfile = open(msgIdMissingInConsumerLogPathName, "w")
for id in missingMsgIdInConsumer:
outfile.write(id + "\n")
outfile.close()
logger.info("no. of unique messages sent from publisher : " + str(len(producerMsgIdSet)), extra=d)
logger.info("no. of unique messages received by consumer : " + str(len(producerMsgIdSet)), extra=d)
if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
validationStatusDict["Validate for data matched"] = "PASSED"
return True
else:
validationStatusDict["Validate for data matched"] = "FAILED"
logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
return False
def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):
if ( len(leaderDict) > 0 ):
try:
leaderBrokerId = leaderDict["brokerid"]
leaderEntityId = leaderDict["entity_id"]
leaderPid = testcaseEnv.entityParentPidDict[leaderEntityId]
hostname = leaderDict["hostname"]
logger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \
leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]", extra=d)
validationStatusDict["Validate leader election successful"] = "PASSED"
return True
except Exception, e:
logger.error("leader info not completed:", extra=d)
validationStatusDict["Validate leader election successful"] = "FAILED"
print leaderDict
print e
return False
else:
validationStatusDict["Validate leader election successful"] = "FAILED"
return False
def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
testcaseConfigsList = testcaseEnv.testcaseConfigsList
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
entityId = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
#testcasePathName = testcaseEnv.testcaseBaseDir
cmdStr = ""
dataDir = ""
logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d)
if role == 'zookeeper':
dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "dataDir")
elif role == 'broker':
dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log.dir")
else:
logger.info("skipping role [" + role + "] on host : [" + hostname + "]", extra=d)
continue
cmdStr = "ssh " + hostname + " 'rm -r " + dataDir + "/*'"
if not dataDir.startswith("/tmp"):
logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
logger.warn("check config file: system_test/cluster_config.properties", extra=d)
logger.warn("aborting test...", extra=d)
sys.exit(1)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)

View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
import logging
import kafka_system_test_utils
import sys
class SetupUtils():
# dict to pass user-defined attributes to logger argument: "extra"
# to use: just update "thisClassName" to the appropriate value
thisClassName = '(ReplicaBasicTest)'
d = {'name_of_class': thisClassName}
logger = logging.getLogger("namedLogger")
anonLogger = logging.getLogger("anonymousLogger")
def __init__(self):
d = {'name_of_class': self.__class__.__name__}
self.logger.info("constructor", extra=SetUpUtils.d)
def log_message(self, message):
print
self.anonLogger.info("======================================================")
self.anonLogger.info(message)
self.anonLogger.info("======================================================")

View File

@ -0,0 +1,326 @@
#!/usr/bin/env python
# ===================================
# system_test_utils.py
# ===================================
import inspect
import json
import logging
import os
import signal
import subprocess
import sys
import time
logger = logging.getLogger("namedLogger")
thisClassName = '(system_test_utils)'
d = {'name_of_class': thisClassName}
def sys_call(cmdStr):
output = ""
#logger.info("executing command [" + cmdStr + "]", extra=d)
p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
output += line
return output
def remote_async_sys_call(host, cmd):
cmdStr = "ssh " + host + " \"" + cmd + "\""
logger.info("executing command [" + cmdStr + "]", extra=d)
async_sys_call(cmdStr)
def remote_sys_call(host, cmd):
cmdStr = "ssh " + host + " \"" + cmd + "\""
logger.info("executing command [" + cmdStr + "]", extra=d)
sys_call(cmdStr)
def get_dir_paths_with_prefix(fullPath, dirNamePrefix):
dirsList = []
for dirName in os.listdir(fullPath):
if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix):
dirsList.append(os.path.abspath(fullPath + "/" + dirName))
return dirsList
def get_testcase_prop_json_pathname(testcasePathName):
testcaseDirName = os.path.basename(testcasePathName)
return testcasePathName + "/" + testcaseDirName + "_properties.json"
def get_json_list_data(infile):
json_file_str = open(infile, "r").read()
json_data = json.loads(json_file_str)
data_list = []
for key,settings in json_data.items():
if type(settings) == list:
for setting in settings:
if type(setting) == dict:
kv_dict = {}
for k,v in setting.items():
kv_dict[k] = v
data_list.append(kv_dict)
return data_list
def get_dict_from_list_of_dicts(listOfDicts, lookupKey, lookupVal):
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
#
# Usage:
#
# 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
# returns:
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
#
# 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
# returns:
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
retList = []
if ( lookupVal is None or lookupKey is None ):
for dict in listOfDicts:
for k,v in dict.items():
if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY
retList.append( dict )
else:
for dict in listOfDicts:
for k,v in dict.items():
if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal
retList.append( dict )
return retList
def get_data_from_list_of_dicts(listOfDicts, lookupKey, lookupVal, fieldToRetrieve):
# Sample List of Dicts:
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
# {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
#
# Usage:
# 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
# => returns ['zookeeper']
# 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
# => returns ['zookeeper', 'broker']
retList = []
if ( lookupVal is None or lookupKey is None ):
for dict in listOfDicts:
for k,v in dict.items():
if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY
try:
retList.append( dict[fieldToRetrieve] )
except:
logger.debug("field not found: " + fieldToRetrieve, extra=d)
else:
for dict in listOfDicts:
for k,v in dict.items():
if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal
try:
retList.append( dict[fieldToRetrieve] )
except:
logger.debug("field not found: " + fieldToRetrieve, extra=d)
return retList
def get_data_by_lookup_keyval(listOfDict, lookupKey, lookupVal, fieldToRetrieve):
returnValue = ""
returnValuesList = get_data_from_list_of_dicts(listOfDict, lookupKey, lookupVal, fieldToRetrieve)
if len(returnValuesList) > 0:
returnValue = returnValuesList[0]
return returnValue
def get_json_dict_data(infile):
json_file_str = open(infile, "r").read()
json_data = json.loads(json_file_str)
data_dict = {}
for key,val in json_data.items():
if ( type(val) != list ):
data_dict[key] = val
return data_dict
def get_remote_child_processes(hostname, pid):
pidStack = []
cmdList = ['''ssh ''' + hostname,
''''pid=''' + pid + '''; prev_pid=""; echo $pid;''',
'''while [[ "x$pid" != "x" ]];''',
'''do prev_pid=$pid;''',
''' for child in $(ps -o pid,ppid ax | awk "{ if ( \$2 == $pid ) { print \$1 }}");''',
''' do echo $child; pid=$child;''',
''' done;''',
''' if [ $prev_pid == $pid ]; then''',
''' break;''',
''' fi;''',
'''done' 2> /dev/null''']
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr, extra=d)
subproc = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE)
for line in subproc.stdout.readlines():
procId = line.rstrip('\n')
pidStack.append(procId)
return pidStack
def get_child_processes(pid):
pidStack = []
currentPid = pid
parentPid = ""
pidStack.append(pid)
while ( len(currentPid) > 0 ):
psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE)
psOutput = psCommand.stdout.read()
outputLine = psOutput.rstrip('\n')
childPid = outputLine.lstrip()
if ( len(childPid) > 0 ):
pidStack.append(childPid)
currentPid = childPid
else:
break
return pidStack
def sigterm_remote_process(hostname, pidStack):
while ( len(pidStack) > 0 ):
pid = pidStack.pop()
cmdStr = "ssh " + hostname + " 'kill -15 " + pid + "'"
try:
logger.debug("executing command [" + cmdStr + "]", extra=d)
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
def sigkill_remote_process(hostname, pidStack):
while ( len(pidStack) > 0 ):
pid = pidStack.pop()
cmdStr = "ssh " + hostname + " 'kill -9 " + pid + "'"
try:
logger.debug("executing command [" + cmdStr + "]", extra=d)
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
def terminate_process(pidStack):
while ( len(pidStack) > 0 ):
pid = pidStack.pop()
try:
os.kill(int(pid), signal.SIGTERM)
except:
print "WARN - pid:",pid,"not found"
def convert_keyval_to_cmd_args(configFilePathname):
cmdArg = ""
inlines = open(configFilePathname, "r").readlines()
for inline in inlines:
line = inline.rstrip()
tokens = line.split('=', 1)
if (len(tokens) == 2):
cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1]
elif (len(tokens) == 1):
cmdArg = cmdArg + " --" + tokens[0]
else:
print "ERROR: unexpected arguments list", line
return cmdArg
def async_sys_call(cmd_str):
subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def sys_call_return_subproc(cmd_str):
p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return p
def remote_host_directory_exists(hostname, path):
cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
logger.debug("executing command: [" + cmdStr + "]", extra=d)
subproc = sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
if "No such file or directory" in line:
return False
return True
def remote_host_processes_stopped(hostname):
cmdStr = "ssh " + hostname + \
" \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null"
logger.info("executing command: [" + cmdStr + "]", extra=d)
subproc = sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
line = line.rstrip('\n')
logger.info("no. of running processes found : [" + line + "]", extra=d)
if line == '0':
return True
return False
def setup_remote_hosts(systemTestEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
for clusterEntityConfigDict in clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
kafkaHome = clusterEntityConfigDict["kafka_home"]
javaHome = clusterEntityConfigDict["java_home"]
localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
logger.info("local kafka home : [" + localKafkaHome + "]", extra=d)
if kafkaHome != localKafkaHome:
logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d)
logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d)
sys.exit(1)
#logger.info("checking running processes in host [" + hostname + "]", extra=d)
#if not remote_host_processes_stopped(hostname):
# logger.error("Running processes found in host [" + hostname + "]", extra=d)
# return False
logger.info("checking JAVA_HOME [" + javaHome + "] in host [" + hostname + "]", extra=d)
if not remote_host_directory_exists(hostname, javaHome):
logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
return False
logger.info("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
if not remote_host_directory_exists(hostname, kafkaHome):
logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
if hostname == "localhost":
return False
else:
localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.."
logger.info("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d)
copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome)
return True
def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir
logger.info("executing command [" + cmdStr + "]", extra=d)
subproc = sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
dummyVar = 1

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python
# ===================================
# testcase_env.py
# ===================================
import json
import os
import sys
class TestcaseEnv():
# ================================
# Generic testcase environment
# ================================
# dictionary of entity parent pid
entityParentPidDict = {}
# list of testcase configs
testcaseConfigsList = []
# dictionary to keep track of testcase arguments such as replica_factor, num_partition
testcaseArgumentsDict = {}
def __init__(self, systemTestEnv, classInstance):
self.systemTestEnv = systemTestEnv
# gather the test case related info and add to an SystemTestEnv object
self.testcaseResultsDict = {}
self.testcaseResultsDict["test_class_name"] = classInstance.__class__.__name__
self.testcaseResultsDict["test_case_name"] = ""
self.validationStatusDict = {}
self.testcaseResultsDict["validation_status"] = self.validationStatusDict
self.systemTestEnv.systemTestResultsList.append(self.testcaseResultsDict)
# FIXME: in a distributed environement, kafkaBaseDir could be different in individual host
# => TBD
self.kafkaBaseDir = ""
self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR
# to be initialized in the Test Module
self.testSuiteBaseDir = ""
self.testCaseBaseDir = ""
self.testCaseLogsDir = ""
# ================================
# dictionary to keep track of
# user-defined environment variables
# ================================
# LEADER_ELECTION_COMPLETED_MSG = "completed the leader state transition"
# REGX_LEADER_ELECTION_PATTERN = "\[(.*?)\] .* Broker (.*?) " + \
# LEADER_ELECTION_COMPLETED_MSG + \
# " for topic (.*?) partition (.*?) \(.*"
# zkConnectStr = ""
# consumerLogPathName = ""
# consumerConfigPathName = ""
# producerLogPathName = ""
# producerConfigPathName = ""
self.userDefinedEnvVarDict = {}