mirror of https://github.com/apache/kafka.git
				
				
				
			
		
			
				
	
	
		
			530 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			530 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Licensed to the Apache Software Foundation (ASF) under one
 | |
| # or more contributor license agreements.  See the NOTICE file
 | |
| # distributed with this work for additional information
 | |
| # regarding copyright ownership.  The ASF licenses this file
 | |
| # to you under the Apache License, Version 2.0 (the
 | |
| # "License"); you may not use this file except in compliance
 | |
| # with the License.  You may obtain a copy of the License at
 | |
| #
 | |
| #   http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing,
 | |
| # software distributed under the License is distributed on an
 | |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | |
| # KIND, either express or implied.  See the License for the
 | |
| # specific language governing permissions and limitations
 | |
| # under the License.
 | |
| #!/usr/bin/env python
 | |
| 
 | |
| # ===================================
 | |
| # system_test_utils.py
 | |
| # ===================================
 | |
| 
 | |
| import copy
 | |
| import inspect
 | |
| import json
 | |
| import logging
 | |
| import os
 | |
| import re
 | |
| import signal
 | |
| import socket
 | |
| import subprocess
 | |
| import sys
 | |
| import time
 | |
| 
 | |
| logger  = logging.getLogger("namedLogger")
 | |
| aLogger = logging.getLogger("anonymousLogger")
 | |
| thisClassName = '(system_test_utils)'
 | |
| d = {'name_of_class': thisClassName}
 | |
| 
 | |
| 
 | |
| def get_current_unix_timestamp():
 | |
|     ts = time.time()
 | |
|     return "{0:.6f}".format(ts)
 | |
| 
 | |
| 
 | |
| def get_local_hostname():
 | |
|     return socket.gethostname()
 | |
| 
 | |
| 
 | |
| def sys_call(cmdStr):
 | |
|     output = ""
 | |
|     #logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|     p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 | |
|     for line in p.stdout.readlines():
 | |
|         output += line
 | |
|     return output
 | |
| 
 | |
| 
 | |
| def remote_async_sys_call(host, cmd):
 | |
|     cmdStr = "ssh " + host + " \"" + cmd + "\""
 | |
|     logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|     async_sys_call(cmdStr)
 | |
| 
 | |
| 
 | |
| def remote_sys_call(host, cmd):
 | |
|     cmdStr = "ssh " + host + " \"" + cmd + "\""
 | |
|     logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|     sys_call(cmdStr)
 | |
| 
 | |
| 
 | |
| def get_dir_paths_with_prefix(fullPath, dirNamePrefix):
 | |
|     dirsList = []
 | |
|     for dirName in os.listdir(fullPath):
 | |
|         if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix):
 | |
|             dirsList.append(os.path.abspath(fullPath + "/" + dirName))
 | |
|     return dirsList
 | |
| 
 | |
| 
 | |
| def get_testcase_prop_json_pathname(testcasePathName):
 | |
|     testcaseDirName = os.path.basename(testcasePathName)
 | |
|     return testcasePathName + "/" + testcaseDirName + "_properties.json"
 | |
| 
 | |
| def get_json_list_data(infile):
 | |
|     json_file_str = open(infile, "r").read()
 | |
|     json_data     = json.loads(json_file_str)
 | |
|     data_list     = []
 | |
| 
 | |
|     for key,settings in json_data.items():
 | |
|         if type(settings) == list:
 | |
|             for setting in settings:
 | |
|                 if type(setting) == dict:
 | |
|                     kv_dict = {}
 | |
|                     for k,v in setting.items():
 | |
|                         kv_dict[k] = v
 | |
|                     data_list.append(kv_dict)
 | |
| 
 | |
|     return data_list
 | |
| 
 | |
| 
 | |
| def get_dict_from_list_of_dicts(listOfDicts, lookupKey, lookupVal):
 | |
|     # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
 | |
|     # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
 | |
|     #
 | |
|     # Usage:
 | |
|     #
 | |
|     # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
 | |
|     #    returns:
 | |
|     #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
 | |
|     #
 | |
|     # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
 | |
|     #    returns:
 | |
|     #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
 | |
|     #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
 | |
| 
 | |
|     retList = []
 | |
|     if ( lookupVal is None or lookupKey is None ):
 | |
|         for dict in listOfDicts:
 | |
|             for k,v in dict.items():
 | |
|                 if ( k == fieldToRetrieve ):               # match with fieldToRetrieve ONLY
 | |
|                     retList.append( dict )
 | |
|     else:
 | |
|         for dict in listOfDicts:
 | |
|             for k,v in dict.items():
 | |
|                 if ( k == lookupKey and v == lookupVal ):  # match with lookupKey and lookupVal
 | |
|                     retList.append( dict )
 | |
| 
 | |
|     return retList
 | |
| 
 | |
| 
 | |
| def get_data_from_list_of_dicts(listOfDicts, lookupKey, lookupVal, fieldToRetrieve):
 | |
|     # Sample List of Dicts:
 | |
|     # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
 | |
|     # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
 | |
|     #
 | |
|     # Usage:
 | |
|     # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
 | |
|     #    => returns ['zookeeper']
 | |
|     # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
 | |
|     #    => returns ['zookeeper', 'broker']
 | |
| 
 | |
|     retList = []
 | |
|     if ( lookupVal is None or lookupKey is None ):
 | |
|         for dict in listOfDicts:
 | |
|             for k,v in dict.items():
 | |
|                 if ( k == fieldToRetrieve ):               # match with fieldToRetrieve ONLY
 | |
|                     try:
 | |
|                         retList.append( dict[fieldToRetrieve] )
 | |
|                     except:
 | |
|                         logger.debug("field not found: " + fieldToRetrieve, extra=d)
 | |
|     else:
 | |
|         for dict in listOfDicts:
 | |
|             for k,v in dict.items():
 | |
|                 if ( k == lookupKey and v == lookupVal ):  # match with lookupKey and lookupVal
 | |
|                     try:
 | |
|                         retList.append( dict[fieldToRetrieve] )
 | |
|                     except:
 | |
|                         logger.debug("field not found: " + fieldToRetrieve, extra=d)
 | |
|     return retList
 | |
| 
 | |
| def get_data_by_lookup_keyval(listOfDict, lookupKey, lookupVal, fieldToRetrieve):
 | |
|     returnValue = ""
 | |
|     returnValuesList = get_data_from_list_of_dicts(listOfDict, lookupKey, lookupVal, fieldToRetrieve)
 | |
|     if len(returnValuesList) > 0:
 | |
|         returnValue = returnValuesList[0]
 | |
| 
 | |
|     return returnValue
 | |
| 
 | |
| def get_json_dict_data(infile):
 | |
|     json_file_str = open(infile, "r").read()
 | |
|     json_data     = json.loads(json_file_str)
 | |
|     data_dict     = {}
 | |
| 
 | |
|     for key,val in json_data.items():
 | |
|         if ( type(val) != list ): 
 | |
|             data_dict[key] = val
 | |
| 
 | |
|     return data_dict
 | |
| 
 | |
| def get_remote_child_processes(hostname, pid):
 | |
|     pidStack = []
 | |
| 
 | |
|     cmdList = ['''ssh ''' + hostname,
 | |
|               ''''pid=''' + pid + '''; prev_pid=""; echo $pid;''',
 | |
|               '''while [[ "x$pid" != "x" ]];''',
 | |
|               '''do prev_pid=$pid;''',
 | |
|               '''  for child in $(ps -o pid,ppid ax | awk "{ if ( \$2 == $pid ) { print \$1 }}");''',
 | |
|               '''    do echo $child; pid=$child;''',
 | |
|               '''  done;''',
 | |
|               '''  if [ $prev_pid == $pid ]; then''',
 | |
|               '''    break;''',
 | |
|               '''  fi;''',
 | |
|               '''done' 2> /dev/null''']
 | |
| 
 | |
|     cmdStr = " ".join(cmdList)
 | |
|     logger.debug("executing command [" + cmdStr, extra=d)
 | |
| 
 | |
|     subproc = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE)
 | |
|     for line in subproc.stdout.readlines():
 | |
|         procId = line.rstrip('\n')
 | |
|         pidStack.append(procId)
 | |
|     return pidStack
 | |
| 
 | |
| def get_child_processes(pid):
 | |
|     pidStack   = []
 | |
|     currentPid = pid
 | |
|     parentPid  = ""
 | |
|     pidStack.append(pid)
 | |
| 
 | |
|     while ( len(currentPid) > 0 ):
 | |
|         psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE)
 | |
|         psOutput  = psCommand.stdout.read()
 | |
|         outputLine = psOutput.rstrip('\n')
 | |
|         childPid   = outputLine.lstrip()
 | |
| 
 | |
|         if ( len(childPid) > 0 ):
 | |
|             pidStack.append(childPid)
 | |
|             currentPid = childPid
 | |
|         else:
 | |
|             break
 | |
|     return pidStack
 | |
| 
 | |
| 
 | |
| def sigterm_remote_process(hostname, pidStack):
 | |
| 
 | |
|     while ( len(pidStack) > 0 ):
 | |
|         pid = pidStack.pop()
 | |
|         cmdStr = "ssh " + hostname + " 'kill -15 " + pid + "'"
 | |
| 
 | |
|         try:
 | |
|             logger.debug("executing command [" + cmdStr + "]", extra=d)
 | |
|             sys_call_return_subproc(cmdStr)
 | |
|         except:
 | |
|             print "WARN - pid:",pid,"not found"
 | |
|             raise
 | |
| 
 | |
| 
 | |
| def sigkill_remote_process(hostname, pidStack):
 | |
| 
 | |
|     while ( len(pidStack) > 0 ):
 | |
|         pid = pidStack.pop()
 | |
|         cmdStr = "ssh " + hostname + " 'kill -9 " + pid + "'"
 | |
| 
 | |
|         try:
 | |
|             logger.debug("executing command [" + cmdStr + "]", extra=d)
 | |
|             sys_call_return_subproc(cmdStr)
 | |
|         except:
 | |
|             print "WARN - pid:",pid,"not found"
 | |
|             raise
 | |
| 
 | |
| 
 | |
| def terminate_process(pidStack):
 | |
|     while ( len(pidStack) > 0 ):
 | |
|         pid = pidStack.pop()
 | |
|         try:
 | |
|             os.kill(int(pid), signal.SIGTERM)
 | |
|         except:
 | |
|             print "WARN - pid:",pid,"not found"
 | |
|             raise
 | |
| 
 | |
| 
 | |
| def convert_keyval_to_cmd_args(configFilePathname):
 | |
|     cmdArg  = ""
 | |
|     inlines = open(configFilePathname, "r").readlines()
 | |
|     for inline in inlines:
 | |
|         line = inline.rstrip()
 | |
|         tokens = line.split('=', 1)
 | |
| 
 | |
|         if (len(tokens) == 2):
 | |
|             cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1]
 | |
|         elif (len(tokens) == 1):
 | |
|             cmdArg = cmdArg + " --" + tokens[0]
 | |
|         else:
 | |
|             print "ERROR: unexpected arguments list", line
 | |
|     return cmdArg
 | |
| 
 | |
| 
 | |
| def async_sys_call(cmd_str):
 | |
|     subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 | |
| 
 | |
| 
 | |
| def sys_call_return_subproc(cmd_str):
 | |
|     p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 | |
|     return p
 | |
| 
 | |
| 
 | |
| def remote_host_file_exists(hostname, pathname):
 | |
|     cmdStr = "ssh " + hostname + " 'ls " + pathname + "'"
 | |
|     logger.debug("executing command: [" + cmdStr + "]", extra=d)
 | |
|     subproc = sys_call_return_subproc(cmdStr)
 | |
| 
 | |
|     for line in subproc.stdout.readlines():
 | |
|         if "No such file or directory" in line:
 | |
|             return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def remote_host_directory_exists(hostname, path):
 | |
|     cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
 | |
|     logger.debug("executing command: [" + cmdStr + "]", extra=d)
 | |
|     subproc = sys_call_return_subproc(cmdStr)
 | |
| 
 | |
|     for line in subproc.stdout.readlines():
 | |
|         if "No such file or directory" in line:
 | |
|             return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def remote_host_processes_stopped(hostname):
 | |
|     cmdStr = "ssh " + hostname + \
 | |
|              " \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null"
 | |
| 
 | |
|     logger.info("executing command: [" + cmdStr + "]", extra=d)
 | |
|     subproc = sys_call_return_subproc(cmdStr)
 | |
| 
 | |
|     for line in subproc.stdout.readlines():
 | |
|         line = line.rstrip('\n')
 | |
|         logger.info("no. of running processes found : [" + line + "]", extra=d)
 | |
|         if line == '0':
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def setup_remote_hosts(systemTestEnv):
 | |
|     # sanity check on remote hosts to make sure:
 | |
|     # - all directories (eg. java_home) specified in cluster_config.json exists in all hosts
 | |
|     # - no conflicting running processes in remote hosts
 | |
| 
 | |
|     aLogger.info("=================================================")
 | |
|     aLogger.info("setting up remote hosts ...")
 | |
|     aLogger.info("=================================================")
 | |
| 
 | |
|     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 | |
| 
 | |
|     localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
 | |
|     localJavaBin   = ""
 | |
|     localJavaHome  = ""
 | |
| 
 | |
|     subproc = sys_call_return_subproc("which java")
 | |
|     for line in subproc.stdout.readlines():
 | |
|         if line.startswith("which: no "):
 | |
|             logger.error("No Java binary found in local host", extra=d)
 | |
|             return False
 | |
|         else:
 | |
|             line = line.rstrip('\n')
 | |
|             localJavaBin = line
 | |
|             matchObj = re.match("(.*)\/bin\/java$", line)
 | |
|             localJavaHome = matchObj.group(1)
 | |
| 
 | |
|     listIndex = -1
 | |
|     for clusterEntityConfigDict in clusterEntityConfigDictList:
 | |
|         listIndex += 1
 | |
| 
 | |
|         hostname  = clusterEntityConfigDict["hostname"]
 | |
|         kafkaHome = clusterEntityConfigDict["kafka_home"]
 | |
|         javaHome  = clusterEntityConfigDict["java_home"]
 | |
| 
 | |
|         if hostname == "localhost" and javaHome == "default":
 | |
|             clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome
 | |
| 
 | |
|         if hostname == "localhost" and kafkaHome == "default":
 | |
|             clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
 | |
| 
 | |
|         kafkaHome = clusterEntityConfigDict["kafka_home"]
 | |
|         javaHome  = clusterEntityConfigDict["java_home"]
 | |
| 
 | |
|         logger.debug("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
 | |
|         if not remote_host_directory_exists(hostname, javaHome):
 | |
|             logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
 | |
|             return False
 | |
| 
 | |
|         logger.debug("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
 | |
|         if not remote_host_directory_exists(hostname, kafkaHome):
 | |
|             logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
 | |
|             if hostname == "localhost":
 | |
|                 return False
 | |
|             else:
 | |
|                 localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.."
 | |
|                 logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d)
 | |
|                 copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome)
 | |
| 
 | |
|     return True
 | |
| 
 | |
| def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
 | |
| 
 | |
|     cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir
 | |
|     logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|     subproc = sys_call_return_subproc(cmdStr)
 | |
| 
 | |
|     for line in subproc.stdout.readlines():
 | |
|         dummyVar = 1
 | |
| 
 | |
| 
 | |
| def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome):
 | |
| 
 | |
|     if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
 | |
|         cmdStr  = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
 | |
|         logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|         sys_call(cmdStr)
 | |
| 
 | |
|         cmdStr  = "ssh " + hostname + " 'rm -rf " + kafkaHome + "'"
 | |
|         logger.info("executing command [" + cmdStr + "]", extra=d)
 | |
|         #sys_call(cmdStr)
 | |
|     else:
 | |
|         logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
 | |
|         logger.warn("check config file: system_test/cluster_config.properties", extra=d)
 | |
|         logger.warn("aborting test...", extra=d)
 | |
|         sys.exit(1)
 | |
| 
 | |
| def get_md5_for_file(filePathName, blockSize=8192):
 | |
|     md5 = hashlib.md5()
 | |
|     f   = open(filePathName, 'rb')
 | |
| 
 | |
|     while True:
 | |
|         data = f.read(blockSize)
 | |
|         if not data:
 | |
|             break
 | |
|         md5.update(data)
 | |
|     return md5.digest()
 | |
| 
 | |
| def load_cluster_config(clusterConfigPathName, clusterEntityConfigDictList):
 | |
|     # empty the list
 | |
|     clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|     # retrieve each entity's data from cluster config json file
 | |
|     # as "dict" and enter them into a "list"
 | |
|     jsonFileContent = open(clusterConfigPathName, "r").read()
 | |
|     jsonData        = json.loads(jsonFileContent)
 | |
|     for key, cfgList in jsonData.items():
 | |
|         if key == "cluster_config":
 | |
|             for cfg in cfgList:
 | |
|                 clusterEntityConfigDictList.append(cfg)
 | |
| 
 | |
| def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName):
 | |
|     # =======================================================================
 | |
|     # starting a new testcase, check for local cluster_config.json
 | |
|     # =======================================================================
 | |
|     # 1. if there is a xxxx_testsuite/testcase_xxxx/cluster_config.json
 | |
|     #    => load it into systemTestEnv.clusterEntityConfigDictList
 | |
|     # 2. if there is NO testcase_xxxx/cluster_config.json but has a xxxx_testsuite/cluster_config.json
 | |
|     #    => retore systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite
 | |
|     # 3. if there is NO testcase_xxxx/cluster_config.json NOR xxxx_testsuite/cluster_config.json
 | |
|     #    => restore system_test/cluster_config.json
 | |
| 
 | |
|     testCaseLevelClusterConfigPathName = testCasePathName + "/cluster_config.json"
 | |
| 
 | |
|     if os.path.isfile(testCaseLevelClusterConfigPathName):
 | |
|         # if there is a cluster_config.json in this directory, load it and use it for this testsuite
 | |
|         logger.info("found a new cluster_config : " + testCaseLevelClusterConfigPathName, extra=d)
 | |
| 
 | |
|         # empty the current cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|         # load the cluster config for this testcase level
 | |
|         load_cluster_config(testCaseLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
 | |
| 
 | |
|         # back up this testcase level cluster config
 | |
|         systemTestEnv.clusterEntityConfigDictListLastFoundInTestCase = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
 | |
| 
 | |
|     elif len(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite) > 0:
 | |
|         # if there is NO testcase_xxxx/cluster_config.json, but has a xxxx_testsuite/cluster_config.json
 | |
|         # => restore the config in xxxx_testsuite/cluster_config.json
 | |
| 
 | |
|         # empty the current cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|         # restore the system_test/cluster_config.json
 | |
|         systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite)
 | |
| 
 | |
|     else:
 | |
|         # if there is NONE, restore the config in system_test/cluster_config.json
 | |
| 
 | |
|         # empty the current cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|         # restore the system_test/cluster_config.json
 | |
|         systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
 | |
| 
 | |
|     # set up remote hosts
 | |
|     if not setup_remote_hosts(systemTestEnv):
 | |
|         logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
 | |
|         print
 | |
|         sys.exit(1)
 | |
|     print
 | |
| 
 | |
| def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName):
 | |
|     # =======================================================================
 | |
|     # starting a new testsuite, check for local cluster_config.json:
 | |
|     # =======================================================================
 | |
|     # 1. if there is a xxxx_testsuite/cluster_config.son
 | |
|     #    => load it into systemTestEnv.clusterEntityConfigDictList
 | |
|     # 2. if there is NO xxxx_testsuite/cluster_config.son
 | |
|     #    => restore system_test/cluster_config.json
 | |
| 
 | |
|     testSuiteLevelClusterConfigPathName = testModulePathName + "/cluster_config.json"
 | |
| 
 | |
|     if os.path.isfile(testSuiteLevelClusterConfigPathName):
 | |
|         # if there is a cluster_config.json in this directory, load it and use it for this testsuite
 | |
|         logger.info("found a new cluster_config : " + testSuiteLevelClusterConfigPathName, extra=d)
 | |
| 
 | |
|         # empty the current cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|         # load the cluster config for this testsuite level
 | |
|         load_cluster_config(testSuiteLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
 | |
| 
 | |
|         # back up this testsuite level cluster config
 | |
|         systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
 | |
| 
 | |
|     else:
 | |
|         # if there is NONE, restore the config in system_test/cluster_config.json
 | |
| 
 | |
|         # empty the last testsuite level cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite[:] = []
 | |
| 
 | |
|         # empty the current cluster config list
 | |
|         systemTestEnv.clusterEntityConfigDictList[:] = []
 | |
| 
 | |
|         # restore the system_test/cluster_config.json
 | |
|         systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
 | |
| 
 | |
|     # set up remote hosts
 | |
|     if not setup_remote_hosts(systemTestEnv):
 | |
|         logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
 | |
|         print
 | |
|         sys.exit(1)
 | |
|     print
 | |
| 
 | |
| 
 | |
| 
 |