OSWorld/show_result_opencua.py

292 lines
13 KiB
Python

from collections import defaultdict
import json
import os
import pandas as pd
import shutil
from loguru import logger
import prettytable
def synthesis(df: pd.DataFrame, domains: list[str], basic: bool = False):
valid_df = df[df["Domain"].isin(domains)]
success_rate = sum(valid_df['%Success Rate'] * valid_df['#Test']) / sum(valid_df['#Test']) if not valid_df.empty else None
if basic:
return {
"#Test": sum(valid_df["#Test"]),
"%Success Rate": success_rate,
}
avg_success_length = sum(valid_df["#Success Steps"]) / sum(valid_df["#Success"]) if sum(valid_df["#Success"]) > 0 else None
avg_failure_length = (sum(valid_df["#Total Steps"]) - sum(valid_df["#Success Steps"])) / (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) if (sum(valid_df["#Test"]) - sum(valid_df["#Success"])) > 0 else None
return {
"#Test": sum(valid_df["#Test"]),
"#Success": sum(valid_df["#Success"]),
"%Success Rate": success_rate,
"#Success Steps": sum(valid_df["#Success Steps"]),
"#Total Steps": sum(valid_df["#Total Steps"]),
"Avg. Success Length": avg_success_length,
"Avg. Failure Length": avg_failure_length,
}
def prettytable_df(df: pd.DataFrame):
table = prettytable.PrettyTable()
table.field_names = df.columns
for _, row in df.iterrows():
table.add_row(row)
table.set_style(prettytable.TableStyle.SINGLE_BORDER)
table.float_format = ".2"
return table
def check_turn_folder_exsitence(folder_path: str):
for sub_folder in os.listdir(folder_path):
if sub_folder.startswith("turn_"):
return True
return False
def get_result_from_folder(target_dir, target_domain: str, print_details: bool, show_single_result:int,turn_id:int, version_id:int, task_file: str):
if not os.path.exists(target_dir):
print("?New experiment, no result yet.")
return None
if "windows" in target_dir.lower():
with open("evaluation_examples_windows/test_all_windows.json", "r") as f:
all_reference = json.load(f)
else:
with open("evaluation_examples/test_all.json", "r") as f:
all_reference = json.load(f)
if "rlrollout" in target_dir.lower():
with open("evaluation_examples/rl_tasks0612.json", "r") as f:
all_reference = json.load(f)
if task_file is not None:
with open(task_file, "r") as f:
all_reference = json.load(f)
try:
with open("evaluation_examples/bad_tests.json", "r") as f:
bad_tests = json.load(f)
except FileNotFoundError:
print("No 'bad_tests.json' found. Continue without bad tests.")
bad_tests = {}
all_result = []
domain_result = defaultdict(dict)
domain_length = defaultdict(dict)
domain_length_success = defaultdict(dict)
domain_length_failure = defaultdict(dict)
manifest = {"domains": []}
if check_turn_folder_exsitence(target_dir):
sub_folder=f"turn_{turn_id}"
if version_id > 0:
sub_folder+=f"_version_{version_id}"
target_turn_dir = os.path.join(target_dir, sub_folder)
if not os.path.exists(target_turn_dir):
print(f"Target directory {target_turn_dir} does not exist.")
return None
else:
target_turn_dir = target_dir
print(f"Check directory: {target_turn_dir}")
for domain in os.listdir(target_turn_dir):
if target_domain != "all" and domain != target_domain:
continue
domain_path = os.path.join(target_turn_dir, domain)
if not os.path.isdir(domain_path):
continue
manifest_domain = {"name": domain, "trajectories": []}
for example_id in all_reference[domain]:
if example_id in bad_tests.get(domain, []):
continue
example_path = os.path.join(domain_path, example_id)
if not os.path.exists(example_path):
continue
if os.listdir(example_path): # If the folder is not empty
manifest_domain["trajectories"].append(example_id)
if "result.txt" not in os.listdir(example_path):
if print_details:
print(f"{example_id}: ERROR, no result.txt")
continue
if "traj.jsonl" not in os.listdir(example_path):
if print_details:
print(f"{example_id}: ERROR, no traj.jsonl")
continue
result = open(os.path.join(example_path, "result.txt"), "r").read()
try:
result = float(result)
except:
if result.strip() in {"True", "true"}:
result = 1.0
elif result.strip() in {"False", "false"}:
result = 0.0
else:
logger.error(f"domain: {domain}, example_id: {example_id}, result: {result}")
logger.exception(f"Unknown result: {result}")
# raise ValueError("Unknown result:", result)
continue
if print_details:
print(f"{example_id}: {result}")
# if domain == "chrome" and result > 0.5:
# print(f"{turn_num}: {example_id}")
if example_id not in domain_result[domain]:
domain_result[domain][example_id] = result
else:
domain_result[domain][example_id] = max(domain_result[domain][example_id], result)
with open(os.path.join(example_path, "traj.jsonl"), "r") as f:
traj = [json.loads(line) for line in f]
step_num_line = -1
while "step_num" not in traj[step_num_line]:
step_num_line-=1
if example_id not in domain_length[domain] or result > 0.5:
domain_length[domain][example_id] = traj[step_num_line]["step_num"]
if result > 0.5: # The success threshold is temporarily 0.5
domain_length_success[domain][example_id] = traj[step_num_line]["step_num"]
else:
domain_length_failure[domain][example_id] = traj[step_num_line]["step_num"]
all_result.append(domain_result[domain][example_id])
if len(manifest_domain["trajectories"]) > 0:
manifest["domains"].append(manifest_domain)
with open(os.path.join(target_turn_dir, "manifest.json"), "w") as f:
json.dump(manifest, f, indent=2)
try:
shutil.copy("html/trajectory/single_exp/index.html", os.path.join(target_turn_dir, "index.html"))
shutil.copy("html/trajectory/single_exp/marked.min.js", os.path.join(target_turn_dir, "marked.min.js"))
except FileNotFoundError:
pass
if len(all_result) == 0:
print("New experiment, no result yet.")
return None
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df = pd.DataFrame([
{
"Domain": domain,
"#Test": len(list(domain_result[domain].values())),
"#Success":len(domain_length_success[domain].values()),
"%Success Rate": sum(list(domain_result[domain].values())) / len(list(domain_result[domain].values())) * 100,
"#Success Steps": sum(domain_length_success[domain].values()),
"#Total Steps": sum(list(domain_length[domain].values())),
# "Avg. Length": sum(domain_length[domain].values()) / len(domain_length[domain].values()) if len(domain_length[domain].values()) > 0 else None,
"Avg. Success Length": sum(domain_length_success[domain].values()) / len(domain_length_success[domain].values()) if len(domain_length_success[domain].values()) > 0 else None,
"Avg. Failure Length": sum(domain_length_failure[domain].values()) / len(domain_length_failure[domain].values()) if len(domain_length_failure[domain].values()) > 0 else None,
} for domain in domain_result.keys()
])
print(prettytable_df(df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
if "windows" in target_dir.lower():
s1_df = pd.DataFrame([
# {"Domain": "OS", **synthesis(df, ["os"])},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
# {"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
### windows_specifed below
{"Domain": "Windows Calc", **synthesis(df, ["windows_calc"])},
{"Domain": "Clock", **synthesis(df, ["clock"])},
{"Domain": "File_Explorer", **synthesis(df, ["file_explorer"])},
{"Domain": "Microsoft_Paint", **synthesis(df, ["microsoft_paint"])},
{"Domain": "Msedge", **synthesis(df, ["msedge"])},
{"Domain": "Notepad", **synthesis(df, ["notepad"])},
{"Domain": "Settings", **synthesis(df, ["settings"])},
])
else:
s1_df = pd.DataFrame([
{"Domain": "OS", **synthesis(df, ["os"])},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"])},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"])},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"])},
{"Domain": "Workflow", **synthesis(df, ["multi_apps"])},
])
print(prettytable_df(s1_df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(f"Total: {len(all_result)}\t Steps: {sum(df['#Total Steps'])}")
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
total_df = pd.DataFrame([
{"Domain": "Total", **synthesis(df, ["os", "libreoffice_calc", "libreoffice_impress", "libreoffice_writer",
"vlc", "thunderbird", "chrome", "gimp", "vs_code", "multi_apps","windows_calc", "clock", "file_explorer", "microsoft_paint", "msedge", "notepad", "settings"])}
])
print(prettytable_df(total_df))
return domain_result, all_result
def domain_results_union(drs: list):
union = defaultdict(dict)
domains = set()
for dr in drs:
domains.update(dr.keys())
for domain in domains:
tasks = set()
for dr in drs:
tasks.update(dr.get(domain, {}).keys())
for task in tasks:
scores = [dr.get(domain, {}).get(task, 0) for dr in drs]
union[domain][task] = max(scores)
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df = pd.DataFrame([
{
"Domain": domain,
"#Test Cases": len(list(union[domain].values())),
"%Success Rate": sum(list(union[domain].values())) / len(list(union[domain].values())) * 100,
} for domain in union.keys()
])
print(prettytable_df(df))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
s1_df = pd.DataFrame([
{"Domain": "OS", **synthesis(df, ["os"], basic=True)},
{"Domain": "Office", **synthesis(df, ["libreoffice_calc", "libreoffice_impress", "libreoffice_writer"], basic=True)},
{"Domain": "Daily", **synthesis(df, ["vlc", "thunderbird", "chrome"], basic=True)},
{"Domain": "Professional", **synthesis(df, ["gimp", "vs_code"], basic=True)},
{"Domain": "Workflow", **synthesis(df, ["multi_apps"], basic=True)},
])
print(prettytable_df(s1_df))
all_result = []
for domain in union.keys():
all_result.extend(list(union[domain].values()))
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(f"Total: {len(all_result)}")
print(f"Success Rate: {sum(all_result) / len(all_result) * 100:.2f}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser("Show result of the experiment.")
parser.add_argument("override_path", nargs='+', type=str, help="One or more result directories.")
parser.add_argument("--task_file", type=str, default=None, help="The task file to use for the experiment.")
parser.add_argument("--show_single_result", type=int, default=0)
parser.add_argument("--domain", type=str, default="all")
parser.add_argument("--print_details", action="store_true")
parser.add_argument("--t",type=int, default=1, help="The turn id to show the result.")
parser.add_argument("--v", type=int, default=0, help="The version id to show the result. Just use for previous result, no need to use in the new experiment.")
args = parser.parse_args()
# print(args.override_path)
if len(args.override_path) == 1:
get_result_from_folder(args.override_path[0], args.domain, args.print_details, args.show_single_result, args.t, args.v, args.task_file)
else:
drs = []
for override_path in args.override_path:
dr, _ = get_result_from_folder(override_path, args.domain, args.print_details, args.show_single_result,args.t, args.v, args.task_file)
if dr is not None:
drs.append(dr)
domain_results_union(drs)