kafka-879; In system test, read the new leader from zookeeper instead of broker log on completion of become-leader state transition; patched by John Fung; reviewed by Jun Rao

This commit is contained in:
John Fung 2013-07-23 09:38:52 -07:00 committed by Jun Rao
parent 3817857b15
commit d3aa3ef073
2 changed files with 75 additions and 18 deletions

View File

@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
# ==============================================
if brokerType == "leader" or brokerType == "follower":
self.log_message("looking up leader")
leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
leaderDict = kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, self.testcaseEnv)
# ==========================
# leaderDict looks like this:
@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
# trigger leader re-election by stopping leader to get re-election latency
reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
#reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
#latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
#self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
#self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
elif brokerType == "follower":
# stopping Follower
@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
# while loop
# update Leader Election Latency MIN/MAX to testcaseEnv.validationStatusDict
self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
try:
self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
except:
pass
self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
try:
self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
except:
pass
#self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
#try:
# self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
# min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
#except:
# pass
#
#self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
#try:
# self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
# max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
#except:
# pass
# =============================================
# tell producer to stop

View File

@ -2211,3 +2211,60 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
else:
validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"
def get_leader_attributes(systemTestEnv, testcaseEnv):
logger.info("Querying Zookeeper for leader info ...", extra=d)
# keep track of leader data in this dict such as broker id & entity id
leaderDict = {}
clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
tcConfigsList = testcaseEnv.testcaseConfigsList
zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
firstZkDict = zkDictList[0]
hostname = firstZkDict["hostname"]
zkEntityId = firstZkDict["entity_id"]
clientPort = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort")
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
# this should have been updated in start_producer_in_thread
producerTopicsString = testcaseEnv.producerTopicsString
topics = producerTopicsString.split(',')
zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state"
brokerid = ''
cmdStrList = ["ssh " + hostname,
"\"JAVA_HOME=" + javaHome,
kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
"-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
zkQueryStr + " 2> /dev/null | tail -1\""]
cmdStr = " ".join(cmdStrList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
for line in subproc.stdout.readlines():
logger.debug("zk returned : " + line, extra=d)
if "\"leader\"" in line:
line = line.rstrip('\n')
json_data = json.loads(line)
for key,val in json_data.items():
if key == 'leader':
brokerid = str(val)
leaderDict["brokerid"] = brokerid
leaderDict["topic"] = topics[0]
leaderDict["partition"] = '0'
leaderDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval(
tcConfigsList, "broker.id", brokerid, "entity_id")
leaderDict["hostname"] = system_test_utils.get_data_by_lookup_keyval(
clusterConfigsList, "entity_id", leaderDict["entity_id"], "hostname")
break
print leaderDict
return leaderDict