KAFKA-483 Improvements to the system testing framework; patched by John Fung; reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1379232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-08-30 23:47:55 +00:00
parent d4f3eff171
commit 4cea1de21a
9 changed files with 443 additions and 120 deletions

View File

@ -1,9 +1,7 @@
# ==========================
# Known Issues:
# ==========================
1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
2. Sometimes the running processes may not be terminated properly by the script.
1. This test framework currently doesn't support MacOS due to different "ps" argument options from Linux. The correct ps execution is required to terminate the background running processes properly.
# ==========================
# Overview
@ -57,7 +55,8 @@ The framework has the following levels:
2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
3. To run the test, go to <kafka_home>/system_test and run the following command:
$ python -B system_test_runner.py
4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line:
namedLogger.setLevel(logging.DEBUG)
# ==========================
# Adding Test Case

View File

@ -4,48 +4,48 @@
"entity_id": "0",
"hostname": "localhost",
"role": "zookeeper",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9990"
},
{
"entity_id": "1",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9991"
},
{
"entity_id": "2",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9992"
},
{
"entity_id": "3",
"hostname": "localhost",
"role": "broker",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9993"
},
{
"entity_id": "4",
"hostname": "localhost",
"role": "producer_performance",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9994"
},
{
"entity_id": "5",
"hostname": "localhost",
"role": "console_consumer",
"kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"kafka_home": "default",
"java_home": "default",
"jmx_port": "9995"
}
]

View File

@ -1,4 +1,3 @@
broker-list=localhost:2181
topic=mytest
messages=200
message-size=100

View File

@ -44,6 +44,7 @@ class ReplicaBasicTest(SetupUtils):
testModuleAbsPathName = os.path.realpath(__file__)
testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName))
isLeaderLogPattern = "Completed the leader state transition"
brokerShutDownCompletedPattern = "shut down completed"
def __init__(self, systemTestEnv):
@ -82,12 +83,16 @@ class ReplicaBasicTest(SetupUtils):
self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
# initialize self.testcaseEnv with user-defined environment
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \
ReplicaBasicTest.isLeaderLogPattern
self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern
self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
"\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = ReplicaBasicTest.isLeaderLogPattern
self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"] = \
"\[(.*?)\] .* Broker (.*?): " + \
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
" for topic (.*?) partition (.*?) \(.*"
self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
# find testcase properties json file
@ -100,7 +105,7 @@ class ReplicaBasicTest(SetupUtils):
testcaseDirName = os.path.basename(testCasePathName)
self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
#### => update testcaseEnv
# update testcaseEnv
self.testcaseEnv.testCaseBaseDir = testCasePathName
self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
@ -110,7 +115,6 @@ class ReplicaBasicTest(SetupUtils):
for k,v in testcaseNonEntityDataDict.items():
if ( k == "description" ): testcaseDescription = v
#### => update testcaseEnv
# TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
# "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
@ -124,13 +128,13 @@ class ReplicaBasicTest(SetupUtils):
# self.testcaseEnv.testCaseLogsDir
# self.testcaseEnv.testcaseArgumentsDict
print
# display testcase name and arguments
self.log_message("Test Case : " + testcaseDirName)
for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
self.anonLogger.info(" " + k + " : " + v)
self.log_message("Description : " + testcaseDescription)
# ================================================================ #
# ================================================================ #
# Product Specific Testing Code Starts Here: #
@ -153,7 +157,7 @@ class ReplicaBasicTest(SetupUtils):
# clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
# generate remote hosts log/config dirs if not exist
kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
@ -196,63 +200,46 @@ class ReplicaBasicTest(SetupUtils):
# 'topic': 'test_1',
# 'brokerid': '3'}
# =============================================
# validate to see if leader election is successful
# =============================================
self.log_message("validating leader election")
result = kafka_system_test_utils.validate_leader_election_successful( \
self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
# checking to see if leader bouncing is required in this testcase
# =============================================
# get leader re-election latency
# =============================================
bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
self.log_message("bounce_leader flag : " + bounceLeaderFlag)
if (bounceLeaderFlag.lower() == "true"):
if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
# no leader available for testing => skip this round
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
continue
else:
# leader elected => stop leader
try:
leaderEntityId = leaderDict["entity_id"]
leaderBrokerId = leaderDict["brokerid"]
leaderPPid = self.testcaseEnv.entityParentPidDict[leaderEntityId]
except:
self.log_message("leader details unavailable")
self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid)
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid)
self.testcaseEnv.entityParentPidDict[leaderEntityId] = ""
self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d)
time.sleep(5)
reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict)
# =============================================
# starting producer
self.log_message("starting producer")
# =============================================
self.log_message("starting producer in the background")
kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 10s")
time.sleep(10)
kafka_system_test_utils.stop_producer()
# =============================================
# starting previously terminated broker
if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
# =============================================
if bounceLeaderFlag.lower() == "true":
self.log_message("starting the previously terminated broker")
stoppedLeaderEntityId = leaderDict["entity_id"]
kafka_system_test_utils.start_entity_in_background(
self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
stoppedLeaderEntityId = leaderDict["entity_id"]
kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
# =============================================
# starting consumer
self.log_message("starting consumer")
# =============================================
self.log_message("starting consumer in the background")
kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 10s")
time.sleep(10)
kafka_system_test_utils.stop_consumer()
# this testcase is completed - so stopping all entities
self.log_message("stopping all entities")
@ -260,6 +247,7 @@ class ReplicaBasicTest(SetupUtils):
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
# validate the data matched
# =============================================
self.log_message("validating data matched")
result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
@ -287,9 +275,27 @@ class ReplicaBasicTest(SetupUtils):
except Exception as e:
self.log_message("Exception while running test {0}".format(e))
traceback.print_exc()
traceback.print_exc()
finally:
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid)
for entityId, jmxParentPidList in self.testcaseEnv.entityJmxParentPidDict.items():
for jmxParentPid in jmxParentPidList:
kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid)
for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items():
consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid)
for hostname, producerPPid in self.testcaseEnv.producerHostParentPidDict.items():
producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid)
#kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)

View File

@ -3,7 +3,7 @@
"testcase_args": {
"bounce_leader": "true",
"replica_factor": "3",
"num_partition": "2"
"num_partition": "1"
},
"entities": [
{
@ -47,7 +47,6 @@
"compression-codec": "1",
"message-size": "500",
"message": "500",
"broker-list": "localhost:9091,localhost:9092,localhost:9093",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},

View File

@ -21,6 +21,7 @@
# ===================================
import datetime
import getpass
import inspect
import json
import logging
@ -132,16 +133,40 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default")
# ==============================
# collect entity log file
# ==============================
cmdList = ["scp",
hostname + ":" + logPathName + "/*",
logPathName]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
# ==============================
# collect entity metrics file
# ==============================
cmdList = ["scp",
hostname + ":" + metricsPathName + "/*",
metricsPathName]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
# ==============================
# collect dashboards file
# ==============================
dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
cmdList = ["scp",
hostname + ":" + dashboardsPathName + "/*",
dashboardsPathName]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
testCaseBaseDir = testcaseEnv.testCaseBaseDir
@ -212,6 +237,7 @@ def copy_file_with_dict_values(srcFile, destFile, dictObj):
outfile.write(line)
outfile.close()
def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
logger.info("calling generate_properties_files", extra=d)
@ -259,7 +285,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
elif ( clusterCfg["role"] == "producer_performance"):
tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
#tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
@ -308,6 +334,62 @@ def start_brokers(systemTestEnv, testcaseEnv):
start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv):
logger.info("looking up broker shutdown...", extra=d)
# keep track of broker related data in this dict such as broker id,
# entity id and timestamp and return it to the caller function
shutdownBrokerDict = {}
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "broker", "entity_id")
for brokerEntityId in brokerEntityIdList:
hostname = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
logFile = system_test_utils.get_data_by_lookup_keyval( \
testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
shutdownBrokerDict["entity_id"] = brokerEntityId
shutdownBrokerDict["hostname"] = hostname
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
cmdStrList = ["ssh " + hostname,
"\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
logPathName + "/" + logFile + " | ",
"sort | tail -1\""]
cmdStr = " ".join(cmdStrList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
line = line.rstrip('\n')
if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
logger.info("found the log line : " + line, extra=d)
try:
matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
datetimeStr = matchObj.group(1)
datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
#print "{0:.3f}".format(unixTs)
shutdownBrokerDict["timestamp"] = unixTs
shutdownBrokerDict["brokerid"] = matchObj.group(2)
logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
return shutdownBrokerDict
except:
logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
raise
#else:
# logger.debug("unmatched line found [" + line + "]", extra=d)
return shutdownBrokerDict
def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
logger.info("looking up leader...", extra=d)
@ -327,9 +409,6 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
logFile = system_test_utils.get_data_by_lookup_keyval( \
testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
leaderDict["entity_id"] = brokerEntityId
leaderDict["hostname"] = hostname
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
cmdStrList = ["ssh " + hostname,
"\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
@ -351,12 +430,21 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
#print "{0:.3f}".format(unixTs)
leaderDict["timestamp"] = unixTs
leaderDict["brokerid"] = matchObj.group(2)
leaderDict["topic"] = matchObj.group(3)
leaderDict["partition"] = matchObj.group(4)
# update leaderDict when
# 1. leaderDict has no logline entry
# 2. leaderDict has existing logline entry but found another logline with more recent timestamp
if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0):
leaderDict["timestamp"] = unixTs
leaderDict["brokerid"] = matchObj.group(2)
leaderDict["topic"] = matchObj.group(3)
leaderDict["partition"] = matchObj.group(4)
leaderDict["entity_id"] = brokerEntityId
leaderDict["hostname"] = hostname
logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d)
except:
logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
raise
#else:
# logger.debug("unmatched line found [" + line + "]", extra=d)
@ -394,7 +482,6 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
# construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
@ -407,14 +494,10 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
configPathName + "/" + configFile + " &> ",
configPathName + "/" + configFile + " >> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
# it seems to be a better idea to launch producer & consumer in separate functions
# elif role == "producer_performance":
# elif role == "console_consumer":
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@ -432,6 +515,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
testcaseEnv.entityParentPidDict[entityId] = tokens[1]
#print "\n#### testcaseEnv.entityParentPidDict ", testcaseEnv.entityParentPidDict, "\n"
time.sleep(1)
metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
@ -451,12 +535,14 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
jmxPort = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting console consumer", extra=d)
consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \
"/console_consumer.log"
consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")
consumerLogPathName = consumerLogPath + "/console_consumer.log"
testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
@ -465,18 +551,48 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
commandArgs + " &> " + consumerLogPathName + "'"]
commandArgs + " &> " + consumerLogPathName,
" & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
system_test_utils.async_sys_call(cmdStr)
time.sleep(2)
metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of the remote entity pid in a dictionary
for line in subproc.stdout.readlines():
if line.startswith("pid"):
line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
def start_producer_performance(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
testcaseConfigsList = testcaseEnv.testcaseConfigsList
brokerListStr = ""
# construct "broker-list" for producer
for clusterEntityConfigDict in clusterEntityConfigDictList:
entityRole = clusterEntityConfigDict["role"]
if entityRole == "broker":
hostname = clusterEntityConfigDict["hostname"]
entityId = clusterEntityConfigDict["entity_id"]
port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port")
if len(brokerListStr) == 0:
brokerListStr = hostname + ":" + port
else:
brokerListStr = brokerListStr + "," + hostname + ":" + port
producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "producer_performance")
@ -489,12 +605,14 @@ def start_producer_performance(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
jmxPort = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting producer preformance", extra=d)
producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \
"/producer_performance.log"
producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default")
producerLogPathName = producerLogPath + "/producer_performance.log"
testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
@ -503,7 +621,9 @@ def start_producer_performance(systemTestEnv, testcaseEnv):
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.perf.ProducerPerformance",
commandArgs + " &> " + producerLogPathName + "'"]
"--broker-list " +brokerListStr,
commandArgs + " &> " + producerLogPathName,
" & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@ -511,6 +631,18 @@ def start_producer_performance(systemTestEnv, testcaseEnv):
time.sleep(1)
metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of the remote entity pid in a dictionary
for line in subproc.stdout.readlines():
if line.startswith("pid"):
line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
testcaseEnv.producerHostParentPidDict[host] = tokens[1]
def stop_remote_entity(systemTestEnv, entityId, parentPid):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@ -519,28 +651,32 @@ def stop_remote_entity(systemTestEnv, entityId, parentPid):
pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
# system_test_utils.sigterm_remote_process(hostname, pidStack)
system_test_utils.sigterm_remote_process(hostname, pidStack)
# time.sleep(1)
# system_test_utils.sigkill_remote_process(hostname, pidStack)
def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
system_test_utils.sigkill_remote_process(hostname, pidStack)
def create_topic(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "producer_performance")
prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \
testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
prodTopicList = prodPerfCfgDict[0]["topic"].split(',')
zkEntityId = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
zkHost = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "role", "zookeeper", "hostname")
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh"
logger.info("zkEntityId : " + zkEntityId, extra=d)
@ -628,6 +764,8 @@ def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatu
except Exception, e:
logger.error("leader info not completed: {0}".format(e), extra=d)
traceback.print_exc()
print leaderDict
traceback.print_exc()
validationStatusDict["Validate leader election successful"] = "FAILED"
return False
else:
@ -645,7 +783,8 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):
hostname = clusterEntityConfigDict["hostname"]
entityId = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
#testcasePathName = testcaseEnv.testcaseBaseDir
kafkaHome = clusterEntityConfigDict["kafka_home"]
testCaseBaseDir = testcaseEnv.testCaseBaseDir
cmdStr = ""
dataDir = ""
@ -667,17 +806,102 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):
logger.warn("aborting test...", extra=d)
sys.exit(1)
# ============================
# cleaning data dir
# ============================
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
# ============================
# cleaning log/metrics/svg, ...
# ============================
if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
# so kafkaHome is a real kafka installation
cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\""
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\""
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\""
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\""
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\""
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def get_entity_log_directory(testCaseBaseDir, entity_id, role):
return testCaseBaseDir + "/logs/" + role + "-" + entity_id
def get_entities_for_role(clusterConfig, role):
return filter(lambda entity: entity['role'] == role, clusterConfig)
def stop_consumer():
system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
def stop_producer():
system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
def ps_grep_terminate_running_entity(systemTestEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
username = getpass.getuser()
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
hostname = clusterEntityConfigDict["hostname"]
cmdList = ["ssh " + hostname,
"\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username,
"| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka",
"| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict):
leaderEntityId = None
leaderBrokerId = None
leaderPPid = None
shutdownLeaderTimestamp = None
if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
# leader election is not successful - something is wrong => so skip this testcase
#continue
return None
else:
# leader elected => stop leader
try:
leaderEntityId = leaderDict["entity_id"]
leaderBrokerId = leaderDict["brokerid"]
leaderPPid = testcaseEnv.entityParentPidDict[leaderEntityId]
except:
logger.info("leader details unavailable", extra=d)
raise
logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d)
stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
logger.info("sleeping for 5s for leader re-election to complete", extra=d)
time.sleep(5)
# get broker shut down completed timestamp
shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv)
#print shutdownBrokerDict
logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d)
logger.info("looking up new leader", extra=d)
leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv)
#print leaderDict2
logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d)
leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms"
return leaderReElectionLatency

View File

@ -31,6 +31,8 @@ import traceback
import csv
import time
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy
@ -78,6 +80,8 @@ def ensure_valid_headers(headers, attributes):
attributeColumnIndex = headers.index(attributes)
return attributeColumnIndex
except ValueError as ve:
#print "#### attributes : ", attributes
#print "#### headers : ", headers
raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +
" headers: {0}".format(",".join(headers)))
@ -119,6 +123,7 @@ def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputG
plots.append(p1)
except Exception as e:
logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
traceback.print_exc()
# find xmin, xmax, ymin, ymax from all csv files
xmin = min(map(lambda coord: coord.x, coordinates))
xmax = max(map(lambda coord: coord.x, coordinates))
@ -172,6 +177,7 @@ def draw_graph_for_role(graphs, entities, role, testcaseEnv):
# print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
except Exception as e:
logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
traceback.print_exc()
def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
@ -231,19 +237,29 @@ def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, te
startMetricsCommand = " ".join(startMetricsCmdList)
logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
system_test_utils.async_sys_call(startMetricsCommand)
pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
time.sleep(1)
pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid 2> /dev/null'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of the remote entity pid in a dictionary
# keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid
# testcaseEnv.entityJmxParentPidDict:
# key: entity_id
# val: list of JMX ppid associated to that entity_id
# { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
for line in subproc.stdout.readlines():
line = line.rstrip('\n')
logger.debug("line: [" + line + "]", extra=d)
if line.startswith("pid"):
line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
thisPid = tokens[1]
testcaseEnv.entityParentPidDict[thisPid] = thisPid
if entityId not in testcaseEnv.entityJmxParentPidDict:
testcaseEnv.entityJmxParentPidDict[entityId] = []
testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid)
#print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n"
def stop_metrics_collection(jmxHost, jmxPort):
logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)

View File

@ -24,7 +24,9 @@ import inspect
import json
import logging
import os
import re
import signal
import socket
import subprocess
import sys
import time
@ -34,6 +36,15 @@ thisClassName = '(system_test_utils)'
d = {'name_of_class': thisClassName}
def get_current_unix_timestamp():
ts = time.time()
return "{0:.6f}".format(ts)
def get_local_hostname():
return socket.gethostname()
def sys_call(cmdStr):
output = ""
#logger.info("executing command [" + cmdStr + "]", extra=d)
@ -218,6 +229,7 @@ def sigterm_remote_process(hostname, pidStack):
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
raise
def sigkill_remote_process(hostname, pidStack):
@ -231,6 +243,7 @@ def sigkill_remote_process(hostname, pidStack):
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
raise
def terminate_process(pidStack):
@ -240,6 +253,7 @@ def terminate_process(pidStack):
os.kill(int(pid), signal.SIGTERM)
except:
print "WARN - pid:",pid,"not found"
raise
def convert_keyval_to_cmd_args(configFilePathname):
@ -267,6 +281,17 @@ def sys_call_return_subproc(cmd_str):
return p
def remote_host_file_exists(hostname, pathname):
cmdStr = "ssh " + hostname + " 'ls " + pathname + "'"
logger.debug("executing command: [" + cmdStr + "]", extra=d)
subproc = sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
if "No such file or directory" in line:
return False
return True
def remote_host_directory_exists(hostname, path):
cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@ -296,24 +321,39 @@ def remote_host_processes_stopped(hostname):
def setup_remote_hosts(systemTestEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
localJavaBin = ""
localJavaHome = ""
subproc = sys_call_return_subproc("which java")
for line in subproc.stdout.readlines():
if line.startswith("which: no "):
logger.error("No Java binary found in local host", extra=d)
return False
else:
line = line.rstrip('\n')
localJavaBin = line
matchObj = re.match("(.*)\/bin\/java$", line)
localJavaHome = matchObj.group(1)
listIndex = -1
for clusterEntityConfigDict in clusterEntityConfigDictList:
listIndex += 1
hostname = clusterEntityConfigDict["hostname"]
kafkaHome = clusterEntityConfigDict["kafka_home"]
javaHome = clusterEntityConfigDict["java_home"]
localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
logger.info("local kafka home : [" + localKafkaHome + "]", extra=d)
if kafkaHome != localKafkaHome:
logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d)
logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d)
sys.exit(1)
if hostname == "localhost" and javaHome == "default":
clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome
#logger.info("checking running processes in host [" + hostname + "]", extra=d)
#if not remote_host_processes_stopped(hostname):
# logger.error("Running processes found in host [" + hostname + "]", extra=d)
# return False
if hostname == "localhost" and kafkaHome == "default":
clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
logger.info("checking JAVA_HOME [" + javaHome + "] in host [" + hostname + "]", extra=d)
kafkaHome = clusterEntityConfigDict["kafka_home"]
javaHome = clusterEntityConfigDict["java_home"]
logger.info("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
if not remote_host_directory_exists(hostname, javaHome):
logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
return False
@ -339,3 +379,22 @@ def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
for line in subproc.stdout.readlines():
dummyVar = 1
def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome):
if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
cmdStr = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
logger.info("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
cmdStr = "ssh " + hostname + " 'rm -r " + kafkaHome + "'"
logger.info("executing command [" + cmdStr + "]", extra=d)
#system_test_utils.sys_call(cmdStr)
else:
logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
logger.warn("check config file: system_test/cluster_config.properties", extra=d)
logger.warn("aborting test...", extra=d)
sys.exit(1)

View File

@ -30,9 +30,30 @@ class TestcaseEnv():
# Generic testcase environment
# ================================
# dictionary of entity parent pid
# dictionary of entity_id to ppid for entities such as zookeepers & brokers
# key: entity_id
# val: ppid of zk or broker associated to that entity_id
# { 0: 12345, 1: 12389, ... }
entityParentPidDict = {}
# dictionary of entity_id to list of JMX ppid
# key: entity_id
# val: list of JMX ppid associated to that entity_id
# { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
entityJmxParentPidDict = {}
# dictionary of hostname-topic-ppid for consumer
# key: hostname
# val: dict of topic-ppid
# { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
consumerHostParentPidDict = {}
# dictionary of hostname-topic-ppid for producer
# key: hostname
# val: dict of topic-ppid
# { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
producerHostParentPidDict = {}
# list of testcase configs
testcaseConfigsList = []