292 lines
13 KiB
Python
292 lines
13 KiB
Python
from collections import defaultdict
|
|
import json
|
|
import os
|
|
import pandas as pd
|
|
import shutil
|
|
from loguru import logger
|
|
import prettytable
|
|
|
|
|
|
def synthesis(df: pd.DataFrame, domains: list[str], basic: bool = False):
|
|
valid_df = df[df["Domain"].isin(domains)]
|
|
success_rate = sum(valid_df['%Success Rate'] * valid_df['#Test']) / sum(valid_df['#Test']) if not valid_df.empty else None
|
|
if basic:
|
|
return {
|
|
"#Test": sum(valid_df["#Test"]),
|
|
"%Success Rate": success_rate,
|
|
}
|
|
avg_success_length = sum(valid_df["#Success Steps"]) / sum(valid_df["#Success"]) if sum(valid_df["#Success"]) > 0 else None
|
|
avg_failure_length = (sum(valid_df["#Total Steps"]) - sum(valid_df["#Success Steps"])) / (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) if (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) > 0 else None
|
|
return {
|
|
"#Test": sum(valid_df["#Test"]),
|
|
"#Success": sum(valid_df["#Success"]),
|
|
"%Success Rate": success_rate,
|
|
"#Success Steps": sum(valid_df["#Success Steps"]),
|
|
"#Total Steps": sum(valid_df["#Total Steps"]),
|
|
"Avg. Success Length": avg_success_length,
|
|
"Avg. Failure Length": avg_failure_length,
|
|
}
|
|
|
|
|
|
def prettytable_df(df: pd.DataFrame):
|
|
table = prettytable.PrettyTable()
|
|
table.field_names = df.columns
|
|
for _, row in df.iterrows():
|
|
table.add_row(row)
|
|
table.set_style(prettytable.TableStyle.SINGLE_BORDER)
|
|
table.float_format = ".2"
|
|
return table
|
|
|
|
def check_turn_folder_exsitence(folder_path: str):
|
|
for sub_folder in os.listdir(folder_path):
|
|
if sub_folder.startswith("turn_"):
|
|
return True
|
|
return False
|
|
|
|
def get_result_from_folder(target_dir, target_domain: str, print_details: bool, show_single_result:int,turn_id:int, version_id:int, task_file: str):
|
|
|
|
if not os.path.exists(target_dir):
|
|
print("?New experiment, no result yet.")
|
|
return None
|
|
|
|
if "windows" in target_dir.lower():
|
|
with open("evaluation_examples_windows/test_all_windows.json", "r") as f:
|
|
all_reference = json.load(f)
|
|
else:
|
|
with open("evaluation_examples/test_all.json", "r") as f:
|
|
all_reference = json.load(f)
|
|
if "rlrollout" in target_dir.lower():
|
|
with open("evaluation_examples/rl_tasks0612.json", "r") as f:
|
|
all_reference = json.load(f)
|
|
if task_file is not None:
|
|
with open(task_file, "r") as f:
|
|
all_reference = json.load(f)
|
|
try:
|
|
with open("evaluation_examples/bad_tests.json", "r") as f:
|
|
bad_tests = json.load(f)
|
|
except FileNotFoundError:
|
|
print("No 'bad_tests.json' found. Continue without bad tests.")
|
|
bad_tests = {}
|
|
|
|
all_result = []
|
|
domain_result = defaultdict(dict)
|
|
domain_length = defaultdict(dict)
|
|
domain_length_success = defaultdict(dict)
|
|
domain_length_failure = defaultdict(dict)
|
|
manifest = {"domains": []}
|
|
|
|
if check_turn_folder_exsitence(target_dir):
|
|
sub_folder=f"turn_{turn_id}"
|
|
if version_id > 0:
|
|
sub_folder+=f"_version_{version_id}"
|
|
target_turn_dir = os.path.join(target_dir, sub_folder)
|
|
if not os.path.exists(target_turn_dir):
|
|
print(f"Target directory {target_turn_dir} does not exist.")
|
|
return None
|
|
else:
|
|
target_turn_dir = target_dir
|
|
|
|
print(f"Check directory: {target_turn_dir}")
|
|
|
|
for domain in os.listdir(target_turn_dir):
|
|
if target_domain != "all" and domain != target_domain:
|
|
continue
|
|
domain_path = os.path.join(target_turn_dir, domain)
|
|
if not os.path.isdir(domain_path):
|
|
continue
|
|
|
|
manifest_domain = {"name": domain, "trajectories": []}
|
|
for example_id in all_reference[domain]:
|
|
if example_id in bad_tests.get(domain, []):
|
|
continue
|
|
example_path = os.path.join(domain_path, example_id)
|
|
if not os.path.exists(example_path):
|
|
continue
|
|
if os.listdir(example_path): # If the folder is not empty
|
|
manifest_domain["trajectories"].append(example_id)
|
|
if "result.txt" not in os.listdir(example_path):
|
|
if print_details:
|
|
print(f"{example_id}: ERROR, no result.txt")
|
|
continue
|
|
if "traj.jsonl" not in os.listdir(example_path):
|
|
if print_details:
|
|
print(f"{example_id}: ERROR, no traj.jsonl")
|
|
continue
|
|
result = open(os.path.join(example_path, "result.txt"), "r").read()
|
|
try:
|
|
result = float(result)
|
|
except:
|
|
if result.strip() in {"True", "true"}:
|
|
result = 1.0
|
|
elif result.strip() in {"False", "false"}:
|
|
result = 0.0
|
|
else:
|
|
logger.error(f"domain: {domain}, example_id: {example_id}, result: {result}")
|
|
logger.exception(f"Unknown result: {result}")
|
|
# raise ValueError("Unknown result:", result)
|
|
continue
|
|
if print_details:
|
|
print(f"{example_id}: {result}")
|
|
# if domain == "chrome" and result > 0.5:
|
|
# print(f"{turn_num}: {example_id}")
|
|
if example_id not in domain_result[domain]:
|
|
domain_result[domain][example_id] = result
|
|
else:
|
|
domain_result[domain][example_id] = max(domain_result[domain][example_id], result)
|
|
|
|
with open(os.path.join(example_path, "traj.jsonl"), "r") as f:
|
|
traj = [json.loads(line) for line in f]
|
|
step_num_line = -1
|
|
|
|
while "step_num" not in traj[step_num_line]:
|
|
step_num_line-=1
|
|
|
|
if example_id not in domain_length[domain] or result > 0.5:
|
|
domain_length[domain][example_id] = traj[step_num_line]["step_num"]
|
|
|
|
if result > 0.5: # The success threshold is temporarily 0.5
|
|
domain_length_success[domain][example_id] = traj[step_num_line]["step_num"]
|
|
else:
|
|
domain_length_failure[domain][example_id] = traj[step_num_line]["step_num"]
|
|
|
|
|
|
all_result.append(domain_result[domain][example_id])
|
|
|
|
if len(manifest_domain["trajectories"]) > 0:
|
|
manifest["domains"].append(manifest_domain)
|
|
|
|
with open(os.path.join(target_turn_dir, "manifest.json"), "w") as f:
|
|
json.dump(manifest, f, indent=2)
|
|
try:
|
|
shutil.copy("html/trajectory/single_exp/index.html", os.path.join(target_turn_dir, "index.html"))
|
|
shutil.copy("html/trajectory/single_exp/marked.min.js", os.path.join(target_turn_dir, "marked.min.js"))
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if len(all_result) == 0:
|
|
print("New experiment, no result yet.")
|
|
return None
|
|
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
df = pd.DataFrame([
|
|
{
|
|
"Domain": domain,
|
|
"#Test": len(list(domain_result[domain].values())),
|
|
"#Success":len(domain_length_success[domain].values()),
|
|
"%Success Rate": sum(list(domain_result[domain].values())) / len(list(domain_result[domain].values())) * 100,
|
|
"#Success Steps": sum(domain_length_success[domain].values()),
|
|
"#Total Steps": sum(list(domain_length[domain].values())),
|
|
# "Avg. Length": sum(domain_length[domain].values()) / len(domain_length[domain].values()) if len(domain_length[domain].values()) > 0 else None,
|
|
"Avg. Success Length": sum(domain_length_success[domain].values()) / len(domain_length_success[domain].values()) if len(domain_length_success[domain].values()) > 0 else None,
|
|
"Avg. Failure Length": sum(domain_length_failure[domain].values()) / len(domain_length_failure[domain].values()) if len(domain_length_failure[domain].values()) > 0 else None,
|
|
} for domain in domain_result.keys()
|
|
])
|
|
print(prettytable_df(df))
|
|
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
if "windows" in target_dir.lower():
|
|
s1_df = pd.DataFrame([
|
|
# {"Domain": "OS", **synthesis(df, ["os"])},
|
|
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
|
|
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
|
|
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
|
|
# {"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
|
|
### windows_specifed below
|
|
{"Domain": "Windows Calc", **synthesis(df, ["windows_calc"])},
|
|
{"Domain": "Clock", **synthesis(df, ["clock"])},
|
|
{"Domain": "File_Explorer", **synthesis(df, ["file_explorer"])},
|
|
{"Domain": "Microsoft_Paint", **synthesis(df, ["microsoft_paint"])},
|
|
{"Domain": "Msedge", **synthesis(df, ["msedge"])},
|
|
{"Domain": "Notepad", **synthesis(df, ["notepad"])},
|
|
{"Domain": "Settings", **synthesis(df, ["settings"])},
|
|
])
|
|
else:
|
|
s1_df = pd.DataFrame([
|
|
{"Domain": "OS", **synthesis(df, ["os"])},
|
|
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
|
|
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
|
|
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
|
|
{"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
|
|
])
|
|
print(prettytable_df(s1_df))
|
|
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
print(f"Total: {len(all_result)}\t Steps: {sum(df['#Total Steps'])}")
|
|
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
|
|
total_df = pd.DataFrame([
|
|
{"Domain": "Total", **synthesis(df, ["os", "libreoffice_calc", "libreoffice_impress", "libreoffice_writer",
|
|
"vlc", "thunderbird", "chrome", "gimp", "vs_code", "multi_apps","windows_calc", "clock", "file_explorer", "microsoft_paint", "msedge", "notepad", "settings"])}
|
|
])
|
|
print(prettytable_df(total_df))
|
|
return domain_result, all_result
|
|
|
|
|
|
def domain_results_union(drs: list):
|
|
union = defaultdict(dict)
|
|
|
|
domains = set()
|
|
for dr in drs:
|
|
domains.update(dr.keys())
|
|
for domain in domains:
|
|
tasks = set()
|
|
for dr in drs:
|
|
tasks.update(dr.get(domain, {}).keys())
|
|
for task in tasks:
|
|
scores = [dr.get(domain, {}).get(task, 0) for dr in drs]
|
|
union[domain][task] = max(scores)
|
|
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
df = pd.DataFrame([
|
|
{
|
|
"Domain": domain,
|
|
"#Test Cases": len(list(union[domain].values())),
|
|
"%Success Rate": sum(list(union[domain].values())) / len(list(union[domain].values())) * 100,
|
|
} for domain in union.keys()
|
|
])
|
|
print(prettytable_df(df))
|
|
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
s1_df = pd.DataFrame([
|
|
{"Domain": "OS", **synthesis(df, ["os"], basic=True)},
|
|
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"], basic=True)},
|
|
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"], basic=True)},
|
|
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"], basic=True)},
|
|
{"Domain": "Workflow", **synthesis(df, ["multi_apps"], basic=True)},
|
|
])
|
|
print(prettytable_df(s1_df))
|
|
|
|
all_result = []
|
|
for domain in union.keys():
|
|
all_result.extend(list(union[domain].values()))
|
|
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
|
print(f"Total: {len(all_result)}")
|
|
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser("Show result of the experiment.")
|
|
parser.add_argument("override_path", nargs='+', type=str, help="One or more result directories.")
|
|
parser.add_argument("--task_file", type=str, default=None, help="The task file to use for the experiment.")
|
|
parser.add_argument("--show_single_result", type=int, default=0)
|
|
parser.add_argument("--domain", type=str, default="all")
|
|
parser.add_argument("--print_details", action="store_true")
|
|
parser.add_argument("--t",type=int, default=1, help="The turn id to show the result.")
|
|
parser.add_argument("--v", type=int, default=0, help="The version id to show the result. Just use for previous result, no need to use in the new experiment.")
|
|
args = parser.parse_args()
|
|
|
|
# print(args.override_path)
|
|
|
|
|
|
|
|
if len(args.override_path) == 1:
|
|
get_result_from_folder(args.override_path[0], args.domain, args.print_details, args.show_single_result, args.t, args.v, args.task_file)
|
|
else:
|
|
drs = []
|
|
for override_path in args.override_path:
|
|
dr, _ = get_result_from_folder(override_path, args.domain, args.print_details, args.show_single_result,args.t, args.v, args.task_file)
|
|
if dr is not None:
|
|
drs.append(dr)
|
|
domain_results_union(drs)
|