mirror of https://github.com/apache/kafka.git
				
				
				
			
		
			
				
	
	
		
			459 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			459 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
# Licensed to the Apache Software Foundation (ASF) under one
 | 
						|
# or more contributor license agreements.  See the NOTICE file
 | 
						|
# distributed with this work for additional information
 | 
						|
# regarding copyright ownership.  The ASF licenses this file
 | 
						|
# to you under the Apache License, Version 2.0 (the
 | 
						|
# "License"); you may not use this file except in compliance
 | 
						|
# with the License.  You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#   http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing,
 | 
						|
# software distributed under the License is distributed on an
 | 
						|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | 
						|
# KIND, either express or implied.  See the License for the
 | 
						|
# specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
#!/usr/bin/env python
 | 
						|
 | 
						|
# ===================================
 | 
						|
# replica_basic_test.py
 | 
						|
# ===================================
 | 
						|
 | 
						|
import inspect
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import pprint
 | 
						|
import signal
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import traceback
 | 
						|
 | 
						|
from   system_test_env    import SystemTestEnv
 | 
						|
sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
 | 
						|
 | 
						|
from   setup_utils        import SetupUtils
 | 
						|
from   replication_utils  import ReplicationUtils
 | 
						|
import system_test_utils
 | 
						|
from   testcase_env       import TestcaseEnv
 | 
						|
 | 
						|
# product specific: Kafka
 | 
						|
import kafka_system_test_utils
 | 
						|
import metrics
 | 
						|
 | 
						|
class ReplicaBasicTest(ReplicationUtils, SetupUtils):
 | 
						|
 | 
						|
    testModuleAbsPathName = os.path.realpath(__file__)
 | 
						|
    testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
 | 
						|
 | 
						|
    def __init__(self, systemTestEnv):
 | 
						|
 | 
						|
        # SystemTestEnv - provides cluster level environment settings
 | 
						|
        #     such as entity_id, hostname, kafka_home, java_home which
 | 
						|
        #     are available in a list of dictionary named 
 | 
						|
        #     "clusterEntityConfigDictList"
 | 
						|
        self.systemTestEnv = systemTestEnv
 | 
						|
 | 
						|
        super(ReplicaBasicTest, self).__init__(self)
 | 
						|
 | 
						|
        # dict to pass user-defined attributes to logger argument: "extra"
 | 
						|
        d = {'name_of_class': self.__class__.__name__}
 | 
						|
 | 
						|
    def signal_handler(self, signal, frame):
 | 
						|
        self.log_message("Interrupt detected - User pressed Ctrl+c")
 | 
						|
 | 
						|
        # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
 | 
						|
        self.log_message("stopping all entities - please wait ...")
 | 
						|
        kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
 | 
						|
        sys.exit(1) 
 | 
						|
 | 
						|
    def runTest(self):
 | 
						|
 | 
						|
        # ======================================================================
 | 
						|
        # get all testcase directories under this testsuite
 | 
						|
        # ======================================================================
 | 
						|
        testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
 | 
						|
            self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
 | 
						|
        testCasePathNameList.sort()
 | 
						|
 | 
						|
        replicationUtils = ReplicationUtils(self)
 | 
						|
 | 
						|
        # =============================================================
 | 
						|
        # launch each testcase one by one: testcase_1, testcase_2, ...
 | 
						|
        # =============================================================
 | 
						|
        for testCasePathName in testCasePathNameList:
 | 
						|
 | 
						|
            skipThisTestCase = False
 | 
						|
 | 
						|
            try: 
 | 
						|
                # ======================================================================
 | 
						|
                # A new instance of TestcaseEnv to keep track of this testcase's env vars
 | 
						|
                # and initialize some env vars as testCasePathName is available now
 | 
						|
                # ======================================================================
 | 
						|
                self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
 | 
						|
                self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
 | 
						|
                self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
 | 
						|
                self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
 | 
						|
 | 
						|
                # ======================================================================
 | 
						|
                # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
 | 
						|
                # ======================================================================
 | 
						|
                testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
 | 
						|
 | 
						|
                if self.systemTestEnv.printTestDescriptionsOnly:
 | 
						|
                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
 | 
						|
                    continue
 | 
						|
                elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
 | 
						|
                    self.log_message("Skipping : " + testcaseDirName)
 | 
						|
                    skipThisTestCase = True
 | 
						|
                    continue
 | 
						|
                else:
 | 
						|
                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
 | 
						|
                    system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
 | 
						|
 | 
						|
 | 
						|
                # ============================================================================== #
 | 
						|
                # ============================================================================== #
 | 
						|
                #                   Product Specific Testing Code Starts Here:                   #
 | 
						|
                # ============================================================================== #
 | 
						|
                # ============================================================================== #
 | 
						|
 | 
						|
                # get optional testcase arguments
 | 
						|
                logRetentionTest = "false"
 | 
						|
                try:
 | 
						|
                    logRetentionTest = self.testcaseEnv.testcaseArgumentsDict["log_retention_test"]
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
                consumerMultiTopicsMode = "false"
 | 
						|
                try:
 | 
						|
                    consumerMultiTopicsMode = self.testcaseEnv.testcaseArgumentsDict["consumer_multi_topics_mode"]
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
                autoCreateTopic = "false"
 | 
						|
                try:
 | 
						|
                    autoCreateTopic = self.testcaseEnv.testcaseArgumentsDict["auto_create_topic"]
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
 | 
						|
                # initialize self.testcaseEnv with user-defined environment variables (product specific)
 | 
						|
                self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
 | 
						|
                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
 | 
						|
                self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
 | 
						|
                self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"] = []
 | 
						|
 | 
						|
                # initialize signal handler
 | 
						|
                signal.signal(signal.SIGINT, self.signal_handler)
 | 
						|
    
 | 
						|
                # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
 | 
						|
                #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
 | 
						|
                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
 | 
						|
                    self.testcaseEnv.testcasePropJsonPathName)
 | 
						|
 | 
						|
                # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
 | 
						|
                kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 | 
						|
                # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
 | 
						|
                # for collecting logs from remote machines
 | 
						|
                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
 | 
						|
   
 | 
						|
                # TestcaseEnv - initialize producer & consumer config / log file pathnames
 | 
						|
                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 | 
						|
                # generate remote hosts log/config dirs if not exist
 | 
						|
                kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 | 
						|
    
 | 
						|
                # generate properties files for zookeeper, kafka, producer, consumer:
 | 
						|
                # 1. copy system_test/<suite_name>_testsuite/config/*.properties to 
 | 
						|
                #    system_test/<suite_name>_testsuite/testcase_<n>/config/
 | 
						|
                # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
 | 
						|
                #    by overriding the settings specified in:
 | 
						|
                #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
 | 
						|
                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
 | 
						|
                    self.testcaseEnv, self.systemTestEnv)
 | 
						|
   
 | 
						|
                # =============================================
 | 
						|
                # preparing all entities to start the test
 | 
						|
                # =============================================
 | 
						|
                self.log_message("starting zookeepers")
 | 
						|
                kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                self.anonLogger.info("sleeping for 2s")
 | 
						|
                time.sleep(2)
 | 
						|
        
 | 
						|
                self.log_message("starting brokers")
 | 
						|
                kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                self.anonLogger.info("sleeping for 5s")
 | 
						|
                time.sleep(5)
 | 
						|
 | 
						|
                if autoCreateTopic.lower() == "false":
 | 
						|
                    self.log_message("creating topics")
 | 
						|
                    kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    self.anonLogger.info("sleeping for 5s")
 | 
						|
                    time.sleep(5)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # start ConsoleConsumer if this is a Log Retention test                
 | 
						|
                # =============================================
 | 
						|
                if logRetentionTest.lower() == "true":
 | 
						|
                    self.log_message("starting consumer in the background")
 | 
						|
                    kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    time.sleep(1)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # starting producer 
 | 
						|
                # =============================================
 | 
						|
                self.log_message("starting producer in the background")
 | 
						|
                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
 | 
						|
                msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
 | 
						|
                self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
 | 
						|
                time.sleep(int(msgProducingFreeTimeSec))
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # A while-loop to bounce leader as specified
 | 
						|
                # by "num_iterations" in testcase_n_properties.json
 | 
						|
                # =============================================
 | 
						|
                i = 1
 | 
						|
                numIterations    = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
 | 
						|
                brokerType       = self.testcaseEnv.testcaseArgumentsDict["broker_type"]
 | 
						|
                bounceBrokerFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_broker"]
 | 
						|
                
 | 
						|
                while i <= numIterations:
 | 
						|
                    self.log_message("Iteration " + str(i) + " of " + str(numIterations))
 | 
						|
                    self.log_message("bounce_broker flag : " + bounceBrokerFlag)
 | 
						|
 | 
						|
                    leaderDict            = None
 | 
						|
                    controllerDict        = None
 | 
						|
                    stoppedBrokerEntityId = ""
 | 
						|
 | 
						|
                    # ==============================================
 | 
						|
                    # Find out the entity id for the stopping broker
 | 
						|
                    # ==============================================
 | 
						|
                    if brokerType == "leader" or brokerType == "follower":
 | 
						|
                        self.log_message("looking up leader")
 | 
						|
                        leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
 | 
						|
 | 
						|
                        # ==========================
 | 
						|
                        # leaderDict looks like this:
 | 
						|
                        # ==========================
 | 
						|
                        #{'entity_id': u'3',
 | 
						|
                        # 'partition': '0',
 | 
						|
                        # 'timestamp': 1345050255.8280001,
 | 
						|
                        # 'hostname': u'localhost',
 | 
						|
                        # 'topic': 'test_1',
 | 
						|
                        # 'brokerid': '3'}
 | 
						|
 | 
						|
                        if brokerType == "leader":
 | 
						|
                            stoppedBrokerEntityId = leaderDict["entity_id"]
 | 
						|
                            self.log_message("Found leader with entity id: " + stoppedBrokerEntityId)
 | 
						|
                        else: # Follower
 | 
						|
                            self.log_message("looking up follower")
 | 
						|
                            # a list of all brokers
 | 
						|
                            brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(self.systemTestEnv.clusterEntityConfigDictList, "role", "broker", "entity_id")
 | 
						|
 | 
						|
                            # we pick the first non-leader broker as the follower
 | 
						|
                            firstFollowerEntityId = None
 | 
						|
                            for brokerEntityId in brokerEntityIdList:
 | 
						|
                                if brokerEntityId != leaderDict["entity_id"]:
 | 
						|
                                    firstFollowerEntityId = brokerEntityId
 | 
						|
                                    break
 | 
						|
                            stoppedBrokerEntityId  = firstFollowerEntityId
 | 
						|
                            self.log_message("Found follower with entity id: " + stoppedBrokerEntityId)
 | 
						|
 | 
						|
                    elif brokerType == "controller":
 | 
						|
                        self.log_message("looking up controller")
 | 
						|
                        controllerDict = kafka_system_test_utils.get_controller_attributes(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 | 
						|
                        # ==========================
 | 
						|
                        # controllerDict looks like this:
 | 
						|
                        # ==========================
 | 
						|
                        #{'entity_id': u'3',
 | 
						|
                        # 'timestamp': 1345050255.8280001,
 | 
						|
                        # 'hostname': u'localhost',
 | 
						|
                        # 'brokerid': '3'}
 | 
						|
 | 
						|
                        stoppedBrokerEntityId = controllerDict["entity_id"]
 | 
						|
                        self.log_message("Found controller with entity id: " + stoppedBrokerEntityId)
 | 
						|
 | 
						|
                    # =============================================
 | 
						|
                    # Bounce the broker
 | 
						|
                    # =============================================
 | 
						|
                    if bounceBrokerFlag.lower() == "true":
 | 
						|
                        if brokerType == "leader":
 | 
						|
                            # validate to see if leader election is successful
 | 
						|
                            self.log_message("validating leader election")
 | 
						|
                            kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
 | 
						|
                
 | 
						|
                            # trigger leader re-election by stopping leader to get re-election latency
 | 
						|
                            reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
 | 
						|
                            latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
 | 
						|
                            self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
 | 
						|
                            self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
 | 
						|
 | 
						|
                        elif brokerType == "follower":
 | 
						|
                            # stopping Follower
 | 
						|
                            self.log_message("stopping follower with entity id: " + firstFollowerEntityId)
 | 
						|
                            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, firstFollowerEntityId, self.testcaseEnv.entityBrokerParentPidDict[firstFollowerEntityId])
 | 
						|
   
 | 
						|
                        elif brokerType == "controller":
 | 
						|
                            # stopping Controller
 | 
						|
                            self.log_message("stopping controller : " + controllerDict["brokerid"])
 | 
						|
                            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, controllerDict["entity_id"], self.testcaseEnv.entityBrokerParentPidDict[controllerDict["entity_id"]])
 | 
						|
 | 
						|
                        brokerDownTimeInSec = 5
 | 
						|
                        try:
 | 
						|
                            brokerDownTimeInSec = int(self.testcaseEnv.testcaseArgumentsDict["broker_down_time_in_sec"])
 | 
						|
                        except:
 | 
						|
                            pass # take default
 | 
						|
                        time.sleep(brokerDownTimeInSec)
 | 
						|
 | 
						|
                        # starting previously terminated broker 
 | 
						|
                        self.log_message("starting the previously terminated broker")
 | 
						|
                        kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedBrokerEntityId)
 | 
						|
 | 
						|
                    else:
 | 
						|
                        # GC Pause simulation
 | 
						|
                        pauseTime = None 
 | 
						|
                        try:
 | 
						|
                            hostname  = leaderDict["hostname"]
 | 
						|
                            pauseTime = self.testcaseEnv.testcaseArgumentsDict["pause_time_in_seconds"]
 | 
						|
                            parentPid = self.testcaseEnv.entityBrokerParentPidDict[leaderDict["entity_id"]]
 | 
						|
                            pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 | 
						|
                            system_test_utils.simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTime)
 | 
						|
                        except:
 | 
						|
                            pass
 | 
						|
 | 
						|
 | 
						|
                    self.anonLogger.info("sleeping for 15s")
 | 
						|
                    time.sleep(15)
 | 
						|
                    i += 1
 | 
						|
                # while loop
 | 
						|
 | 
						|
                # update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict
 | 
						|
                self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
 | 
						|
                try:
 | 
						|
                    self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
 | 
						|
                        min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
                self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
 | 
						|
                try:
 | 
						|
                    self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
 | 
						|
                        max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # tell producer to stop
 | 
						|
                # =============================================
 | 
						|
                self.testcaseEnv.lock.acquire()
 | 
						|
                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
 | 
						|
                time.sleep(1)
 | 
						|
                self.testcaseEnv.lock.release()
 | 
						|
                time.sleep(1)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # wait for producer thread's update of
 | 
						|
                # "backgroundProducerStopped" to be "True"
 | 
						|
                # =============================================
 | 
						|
                while 1:
 | 
						|
                    self.testcaseEnv.lock.acquire()
 | 
						|
                    self.logger.info("status of backgroundProducerStopped : [" + \
 | 
						|
                        str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
 | 
						|
                    if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
 | 
						|
                        time.sleep(1)
 | 
						|
                        self.logger.info("all producer threads completed", extra=self.d)
 | 
						|
                        break
 | 
						|
                    time.sleep(1)
 | 
						|
                    self.testcaseEnv.lock.release()
 | 
						|
                    time.sleep(2)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # collect logs from remote hosts to find the
 | 
						|
                # minimum common offset of a certain log
 | 
						|
                # segment file among all replicas
 | 
						|
                # =============================================
 | 
						|
                minStartingOffsetDict = None
 | 
						|
                if logRetentionTest.lower() == "true":
 | 
						|
                    self.anonLogger.info("sleeping for 60s to make sure log truncation is completed")
 | 
						|
                    time.sleep(60)
 | 
						|
                    kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 | 
						|
                    minStartingOffsetDict = kafka_system_test_utils.getMinCommonStartingOffset(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    print
 | 
						|
                    pprint.pprint(minStartingOffsetDict)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # starting debug consumer
 | 
						|
                # =============================================
 | 
						|
                if consumerMultiTopicsMode.lower() == "false":
 | 
						|
                    self.log_message("starting debug consumers in the background")
 | 
						|
                    kafka_system_test_utils.start_simple_consumer(self.systemTestEnv, self.testcaseEnv, minStartingOffsetDict)
 | 
						|
                    self.anonLogger.info("sleeping for 10s")
 | 
						|
                    time.sleep(10)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # starting console consumer
 | 
						|
                # =============================================
 | 
						|
                if logRetentionTest.lower() == "false":
 | 
						|
                    self.log_message("starting consumer in the background")
 | 
						|
                    kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    time.sleep(10)
 | 
						|
                    
 | 
						|
                # =============================================
 | 
						|
                # this testcase is completed - stop all entities
 | 
						|
                # =============================================
 | 
						|
                self.log_message("stopping all entities")
 | 
						|
                for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
 | 
						|
                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
 | 
						|
 | 
						|
                for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
 | 
						|
                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
 | 
						|
 | 
						|
                # make sure all entities are stopped
 | 
						|
                kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # collect logs from remote hosts
 | 
						|
                # =============================================
 | 
						|
                kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 | 
						|
                # =============================================
 | 
						|
                # validate the data matched and checksum
 | 
						|
                # =============================================
 | 
						|
                self.log_message("validating data matched")
 | 
						|
 | 
						|
                if logRetentionTest.lower() == "true":
 | 
						|
                    kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
 | 
						|
                elif consumerMultiTopicsMode.lower() == "true":
 | 
						|
                    kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(
 | 
						|
                        self.systemTestEnv, self.testcaseEnv, replicationUtils)
 | 
						|
                else:
 | 
						|
                    kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
 | 
						|
                    kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils)
 | 
						|
 | 
						|
                kafka_system_test_utils.validate_index_log(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 
 | 
						|
                # =============================================
 | 
						|
                # draw graphs
 | 
						|
                # =============================================
 | 
						|
                metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, 
 | 
						|
                                        self.testcaseEnv, 
 | 
						|
                                        self.systemTestEnv.clusterEntityConfigDictList)
 | 
						|
                
 | 
						|
                # build dashboard, one for each role
 | 
						|
                metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
 | 
						|
                                             self.testcaseEnv.testCaseDashboardsDir,
 | 
						|
                                             self.systemTestEnv.clusterEntityConfigDictList)
 | 
						|
            except Exception as e:
 | 
						|
                self.log_message("Exception while running test {0}".format(e))
 | 
						|
                traceback.print_exc()
 | 
						|
 | 
						|
            finally:
 | 
						|
                if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
 | 
						|
                    self.log_message("stopping all entities - please wait ...")
 | 
						|
                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
 | 
						|
 |