Compare commits
13 Commits
main
...
opencua-72
| Author | SHA1 | Date |
|---|---|---|
|
|
29c423de56 | |
|
|
63cb699fc0 | |
|
|
b41640285e | |
|
|
2ce1ab31b5 | |
|
|
a5b15885ab | |
|
|
cdd7640044 | |
|
|
c36c2e36d9 | |
|
|
c487414018 | |
|
|
4babaf30b3 | |
|
|
f998aca0b5 | |
|
|
826c0ef945 | |
|
|
80b80617c4 | |
|
|
0f1ef6d9b7 |
|
|
@ -55,6 +55,7 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl
|
|||
logger.info("The episode is done.")
|
||||
break
|
||||
step_idx += 1
|
||||
time.sleep(20) # Wait for the environment to settle
|
||||
result = env.evaluate()
|
||||
logger.info("Result: %.2f", result)
|
||||
scores.append(result)
|
||||
|
|
@ -186,23 +187,25 @@ def run_single_example_opencua(agent, env, example, max_steps, instruction, args
|
|||
"wb") as _f:
|
||||
_f.write(obs['screenshot'])
|
||||
|
||||
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
|
||||
with open(os.path.join(example_result_dir, "traj.jsonl"), "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps({
|
||||
"step_num": step_idx + 1,
|
||||
"action_timestamp": action_timestamp,
|
||||
"action": action,
|
||||
"natural_language_action": info_dict.get("action"),
|
||||
"action_timestamp": action_timestamp,
|
||||
"response": response,
|
||||
"reward": reward,
|
||||
"done": done,
|
||||
"info": info,
|
||||
"screenshot_file": f"step_{step_idx + 1}_{action_timestamp}.png"
|
||||
}))
|
||||
}, ensure_ascii=False))
|
||||
f.write("\n")
|
||||
if done:
|
||||
logger.info("The episode is done.")
|
||||
break
|
||||
step_idx += 1
|
||||
|
||||
time.sleep(20) # Wait for the environment to settle
|
||||
result = env.evaluate()
|
||||
logger.info("Result: %.2f", result)
|
||||
scores.append(result)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
from mm_agents.opencua.opencua_agent import OpenCUAAgent
|
||||
|
||||
__all__ = ["OpenCUAAgent"]
|
||||
|
|
@ -0,0 +1,470 @@
|
|||
"""
|
||||
OpenCUA Agent Implementation
|
||||
|
||||
This module implements an OpenCUA agent for desktop automation tasks, building upon
|
||||
existing frameworks and integrating multiple coordinate mapping systems.
|
||||
|
||||
Framework and Implementation Sources:
|
||||
- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py
|
||||
- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py
|
||||
- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import ast
|
||||
import time
|
||||
import math
|
||||
import httpx
|
||||
import base64
|
||||
import backoff
|
||||
import traceback
|
||||
from loguru import logger
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from mm_agents.opencua.utils import (
|
||||
encode_image,
|
||||
smart_resize,
|
||||
)
|
||||
from mm_agents.opencua.prompts import (
|
||||
INSTRUTION_TEMPLATE,
|
||||
STEP_TEMPLATE,
|
||||
ACTION_HISTORY_TEMPLATE,
|
||||
THOUGHT_HISTORY_TEMPLATE,
|
||||
OBSERVATION_HISTORY_TEMPLATE,
|
||||
# OpenCUA-7B, 32B system prompts
|
||||
SYSTEM_PROMPT_V1_L1,
|
||||
SYSTEM_PROMPT_V1_L2,
|
||||
SYSTEM_PROMPT_V1_L3,
|
||||
# OpenCUA-72B system prompts
|
||||
build_sys_prompt,
|
||||
)
|
||||
|
||||
def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]:
|
||||
"""Parse response including Observation, Thought, Action and code block"""
|
||||
sections = {}
|
||||
try:
|
||||
|
||||
obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if obs_match:
|
||||
sections['observation'] = obs_match.group(1).strip()
|
||||
|
||||
thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if thought_match:
|
||||
sections['thought'] = thought_match.group(1).strip()
|
||||
|
||||
action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if action_match:
|
||||
action = action_match.group(1).strip()
|
||||
sections['action'] = action.strip()
|
||||
|
||||
code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
|
||||
if not code_blocks:
|
||||
logger.error("No code blocks found in the input string")
|
||||
return f"<Error>: no code blocks found in the input string: {input_string}", ["FAIL"], sections
|
||||
code_block = code_blocks[-1].strip()
|
||||
sections['original_code'] = code_block
|
||||
|
||||
if "computer.wait" in code_block.lower():
|
||||
sections["code"] = "WAIT"
|
||||
return sections['action'], ["WAIT"], sections
|
||||
|
||||
elif "computer.terminate" in code_block.lower():
|
||||
lower_block = code_block.lower()
|
||||
if ("failure" in lower_block) or ("fail" in lower_block):
|
||||
sections['code'] = "FAIL"
|
||||
return code_block, ["FAIL"], sections
|
||||
elif "success" in lower_block:
|
||||
sections['code'] = "DONE"
|
||||
return code_block, ["DONE"], sections
|
||||
else:
|
||||
logger.error("Terminate action found but no specific status provided in code block")
|
||||
return f"<Error>: terminate action found but no specific status provided in code block: {input_string}", ["FAIL"], sections
|
||||
|
||||
# corrected_code = correct_pyautogui_arguments(code_block)
|
||||
corrected_code = code_block
|
||||
sections['code'] = corrected_code
|
||||
sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type)
|
||||
|
||||
if ('code' not in sections or sections['code'] is None or sections['code'] == "") or ('action' not in sections or sections['action'] is None or sections['action'] == ""):
|
||||
logger.error("Missing required action or code section")
|
||||
return f"<Error>: no code parsed: {input_string}", ["FAIL"], sections
|
||||
|
||||
return sections['action'], [sections['code']], sections
|
||||
|
||||
except Exception as e:
|
||||
error_message = f"<Error>: parsing response: {str(e)}\nTraceback:\n{traceback.format_exc()}\nInput string: {input_string}"
|
||||
logger.error(error_message)
|
||||
return error_message, ['FAIL'], sections
|
||||
|
||||
def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"):
|
||||
"""
|
||||
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
|
||||
"""
|
||||
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
|
||||
if coordinate_type == "relative":
|
||||
return int(round(x * screen_width)), int(round(y * screen_height))
|
||||
elif coordinate_type == "qwen25":
|
||||
height, width = smart_resize(
|
||||
height=screen_height,
|
||||
width=screen_width,
|
||||
factor=28,
|
||||
min_pixels=3136,
|
||||
max_pixels=12845056
|
||||
)
|
||||
if 0 <= x <= 1 and 0 <= y <= 1:
|
||||
# If already normalized, treat like "relative"
|
||||
return int(round(x * width)), int(round(y * height))
|
||||
return int(x / width * screen_width), int(y / height * screen_height)
|
||||
else:
|
||||
raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].")
|
||||
|
||||
pattern = r'(pyautogui\.\w+\([^\)]*\))'
|
||||
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
|
||||
|
||||
new_code = pyautogui_code_relative_coordinates
|
||||
|
||||
for full_call in matches:
|
||||
func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
|
||||
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
|
||||
if not func_match:
|
||||
continue
|
||||
|
||||
func_name = func_match.group(1)
|
||||
args_str = func_match.group(2)
|
||||
|
||||
try:
|
||||
parsed = ast.parse(f"func({args_str})").body[0].value
|
||||
parsed_args = parsed.args
|
||||
parsed_keywords = parsed.keywords
|
||||
|
||||
except SyntaxError:
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
function_parameters = {
|
||||
'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
|
||||
'rightClick': ['x', 'y', 'duration', 'tween', 'pause'],
|
||||
'middleClick': ['x', 'y', 'duration', 'tween', 'pause'],
|
||||
'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
|
||||
'tripleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
|
||||
'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
|
||||
'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
|
||||
}
|
||||
|
||||
func_base_name = func_name.split('.')[-1]
|
||||
|
||||
param_names = function_parameters.get(func_base_name, [])
|
||||
|
||||
args = {}
|
||||
for idx, arg in enumerate(parsed_args):
|
||||
if idx < len(param_names):
|
||||
param_name = param_names[idx]
|
||||
arg_value = ast.literal_eval(arg)
|
||||
args[param_name] = arg_value
|
||||
|
||||
try:
|
||||
for kw in parsed_keywords:
|
||||
param_name = kw.arg
|
||||
arg_value = ast.literal_eval(kw.value)
|
||||
args[param_name] = arg_value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing keyword arguments: {e}")
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
updated = False
|
||||
if 'x' in args and 'y' in args:
|
||||
try:
|
||||
x_rel = float(args['x'])
|
||||
y_rel = float(args['y'])
|
||||
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
|
||||
logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
|
||||
args['x'] = x_abs
|
||||
args['y'] = y_abs
|
||||
updated = True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if updated:
|
||||
reconstructed_args = []
|
||||
for idx, param_name in enumerate(param_names):
|
||||
if param_name in args:
|
||||
arg_value = args[param_name]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"'{arg_value}'"
|
||||
else:
|
||||
arg_repr = str(arg_value)
|
||||
reconstructed_args.append(arg_repr)
|
||||
else:
|
||||
break
|
||||
|
||||
used_params = set(param_names[:len(reconstructed_args)])
|
||||
for kw in parsed_keywords:
|
||||
if kw.arg not in used_params:
|
||||
arg_value = args[kw.arg]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"{kw.arg}='{arg_value}'"
|
||||
else:
|
||||
arg_repr = f"{kw.arg}={arg_value}"
|
||||
reconstructed_args.append(arg_repr)
|
||||
|
||||
new_args_str = ', '.join(reconstructed_args)
|
||||
new_full_call = f"{func_name}({new_args_str})"
|
||||
new_code = new_code.replace(full_call, new_full_call)
|
||||
|
||||
return new_code
|
||||
|
||||
def transform_agnet_action_to_code_block(action):
|
||||
if any(keyword in action for keyword in ["computer.terminate", "computer.wait", "browser.select_option", "browser.clear"]):
|
||||
return f"```code\n{action}\n```"
|
||||
else:
|
||||
return f"```python\n{action}\n```"
|
||||
|
||||
class OpenCUAAgent:
|
||||
"""
|
||||
OpenCUA Agent for desktop automation tasks.
|
||||
|
||||
This class implements a OpenCUA Model based agent that can observe
|
||||
desktop environments through screenshots and execute mouse/keyboard actions
|
||||
via PyAutoGUI to complete automation tasks.
|
||||
|
||||
Attributes:
|
||||
model (str): Name of the language model being used
|
||||
history_type (str): Type of history recording mechanism
|
||||
actions (list): History of executed actions
|
||||
observations (list): History of environment observations
|
||||
cots (list): Chain of thought reasoning records
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
model: str, # OpenCUA model name
|
||||
history_type: str, # History step type: action_history, thought_history, observation_history
|
||||
max_steps: int, # The max number of steps to finish the task
|
||||
max_image_history_length: int = 3, # The max number of images in the history
|
||||
platform: str = "ubuntu", # The platform of the computer
|
||||
max_tokens: int = 1500, # The max number of tokens in the response
|
||||
top_p: float = 0.9, # The top p value in the response
|
||||
temperature: float = 0, # The temperature value in the response
|
||||
action_space: str = "pyautogui", # The action space: pyautogui
|
||||
observation_type: str = "screenshot", # The observation type: screenshot
|
||||
cot_level: str = "l2", # The CoT level: l1, l2, l3
|
||||
screen_size: Tuple[int, int] = (1920, 1080), # The screen size
|
||||
coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25
|
||||
use_old_sys_prompt: bool = False, # Whether to use the old system prompt
|
||||
password="osworld-public-evaluation", # The password for the ubuntu platform
|
||||
**kwargs
|
||||
):
|
||||
assert coordinate_type in ["relative", "absolute", "qwen25"]
|
||||
assert action_space in ["pyautogui"], "Invalid action space"
|
||||
assert observation_type in ["screenshot"], "Invalid observation type"
|
||||
assert history_type in ["action_history", "thought_history", "observation_history"]
|
||||
assert model is not None, "Model cannot be None"
|
||||
|
||||
self.model = model
|
||||
self.platform = platform
|
||||
self.max_tokens = max_tokens
|
||||
self.top_p = top_p
|
||||
self.temperature = temperature
|
||||
self.action_space = action_space
|
||||
self.observation_type = observation_type
|
||||
self.history_type = history_type
|
||||
self.coordinate_type = coordinate_type
|
||||
self.cot_level = cot_level
|
||||
self.screen_size = screen_size
|
||||
self.max_image_history_length = max_image_history_length
|
||||
self.max_steps = max_steps
|
||||
self.password = password
|
||||
|
||||
if history_type == "action_history":
|
||||
self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE
|
||||
elif history_type == "thought_history":
|
||||
self.HISTORY_TEMPLATE = THOUGHT_HISTORY_TEMPLATE
|
||||
elif history_type == "observation_history":
|
||||
self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE
|
||||
else:
|
||||
raise ValueError(f"Invalid history type: {history_type}")
|
||||
|
||||
if use_old_sys_prompt:
|
||||
if cot_level == "l1":
|
||||
self.system_prompt = SYSTEM_PROMPT_V1_L1
|
||||
elif cot_level == "l2":
|
||||
self.system_prompt = SYSTEM_PROMPT_V1_L2
|
||||
elif cot_level == "l3":
|
||||
self.system_prompt = SYSTEM_PROMPT_V1_L3
|
||||
else:
|
||||
raise ValueError("Invalid cot_level. Choose from 'l1', 'l2', or 'l3'.")
|
||||
else:
|
||||
self.system_prompt = build_sys_prompt(
|
||||
level=self.cot_level,
|
||||
password=self.password,
|
||||
use_random=False
|
||||
)
|
||||
|
||||
self.actions = []
|
||||
self.observations = []
|
||||
self.cots = []
|
||||
|
||||
def reset(self, _logger=None):
|
||||
global logger
|
||||
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
|
||||
|
||||
self.observations = []
|
||||
self.cots = []
|
||||
self.actions = []
|
||||
|
||||
def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str:
|
||||
""" pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system"""
|
||||
if self.platform.lower() != "windows":
|
||||
return code
|
||||
|
||||
pattern_pos = re.compile(r'(pyautogui\.scroll\()\s*([-+]?\d+)\s*\)')
|
||||
code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code)
|
||||
return code
|
||||
|
||||
def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]:
|
||||
"""
|
||||
Predict the next action(s) based on the current observation.
|
||||
"""
|
||||
if "step_idx" in kwargs:
|
||||
logger.info(f"========= {self.model} Step {kwargs['step_idx']} =======")
|
||||
else:
|
||||
logger.info(f"========================== {self.model} ===================================")
|
||||
logger.info(f"Instruction: \n{instruction}")
|
||||
|
||||
messages = []
|
||||
messages.append({
|
||||
"role": "system",
|
||||
"content": self.system_prompt
|
||||
})
|
||||
instruction_prompt = INSTRUTION_TEMPLATE.format(instruction=instruction)
|
||||
|
||||
history_step_texts = []
|
||||
for i in range(len(self.actions)):
|
||||
if i > len(self.actions) - self.max_image_history_length:
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}"}
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
|
||||
observation=self.cots[i].get('observation'),
|
||||
thought=self.cots[i].get('thought'),
|
||||
action=self.cots[i].get('action')
|
||||
)
|
||||
|
||||
messages.append({
|
||||
"role": "assistant",
|
||||
"content": history_content
|
||||
})
|
||||
else:
|
||||
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
|
||||
observation=self.cots[i].get('observation'),
|
||||
thought=self.cots[i].get('thought'),
|
||||
action=self.cots[i].get('action')
|
||||
)
|
||||
history_step_texts.append(history_content)
|
||||
if i == len(self.actions) - self.max_image_history_length:
|
||||
messages.append({
|
||||
"role":"assistant",
|
||||
"content": "\n".join(history_step_texts)
|
||||
})
|
||||
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"text": instruction_prompt
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
max_retry = 5
|
||||
retry_count = 0
|
||||
low_level_instruction = None
|
||||
pyautogui_actions = None
|
||||
other_cot = {}
|
||||
|
||||
while retry_count < max_retry:
|
||||
try:
|
||||
response = self.call_llm({
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
"max_tokens": self.max_tokens,
|
||||
"top_p": self.top_p,
|
||||
"temperature": self.temperature if retry_count==0 else max(0.2, self.temperature)
|
||||
}, self.model)
|
||||
|
||||
logger.info(f"Model Output: \n{response}")
|
||||
if not response:
|
||||
logger.error("No response found in the response.")
|
||||
raise ValueError(f"No response found in the response:\n{response}.")
|
||||
|
||||
low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type)
|
||||
if "<Error>" in low_level_instruction or not pyautogui_actions:
|
||||
logger.error(f"Error parsing response: {low_level_instruction}")
|
||||
raise ValueError(f"Error parsing response: {low_level_instruction}")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during message preparation: {e}")
|
||||
retry_count += 1
|
||||
if retry_count == max_retry:
|
||||
logger.error("Maximum retries reached. Exiting.")
|
||||
return str(e), ['FAIL'], other_cot
|
||||
|
||||
pyautogui_actions = [
|
||||
self._scale_scroll_for_windows(code) for code in pyautogui_actions
|
||||
]
|
||||
logger.info(f"Action: \n{low_level_instruction}")
|
||||
logger.info(f"Code: \n{pyautogui_actions}")
|
||||
|
||||
self.observations.append(obs)
|
||||
self.actions.append(low_level_instruction)
|
||||
self.cots.append(other_cot)
|
||||
|
||||
current_step = len(self.actions)
|
||||
if current_step >= self.max_steps and 'computer.terminate' not in pyautogui_actions[0].lower():
|
||||
logger.warning(f"Reached maximum steps {self.max_steps}. Forcing termination.")
|
||||
low_level_instruction = 'Fail the task because reaching the maximum step limit.'
|
||||
pyautogui_actions = ['FAIL']
|
||||
other_cot['code'] = 'FAIL'
|
||||
|
||||
return response, pyautogui_actions, other_cot
|
||||
|
||||
|
||||
def call_llm(self, payload, model):
|
||||
"""Call the LLM API"""
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}"
|
||||
}
|
||||
|
||||
for _ in range(20):
|
||||
response = httpx.post(
|
||||
f"https://{self.model}.app.msh.team/v1/chat/completions",
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=500,
|
||||
verify=False
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
logger.error("Failed to call LLM: " + response.text)
|
||||
logger.error("Retrying...")
|
||||
time.sleep(5)
|
||||
else:
|
||||
response = response.json()
|
||||
finish_reason = response["choices"][0].get("finish_reason")
|
||||
if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens
|
||||
return response['choices'][0]['message']['content']
|
||||
else:
|
||||
logger.error("LLM did not finish properly, retrying...")
|
||||
time.sleep(5)
|
||||
|
|
@ -0,0 +1,349 @@
|
|||
import random
|
||||
|
||||
# System prompt for OpenCUA-7B, OpenCUA-32B
|
||||
# System prompts used in the training data
|
||||
SYSTEM_PROMPT_V1_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
|
||||
SYSTEM_PROMPT_V1_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
|
||||
SYSTEM_PROMPT_V1_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip()
|
||||
|
||||
# Testing prompt on OSWorld-Verified
|
||||
SYSTEM_PROMPT_V1_L2 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. The password of the computer is "osworld-public-evaluation". If the task is not possible to do, output the action computer.terminate(status='failure').
|
||||
|
||||
For each step, provide your response in this format:
|
||||
|
||||
Thought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning
|
||||
|
||||
Action:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize "—", maximize "□", close "X")\n - if the action involves keyboard actions like \'press\', \'write\', \'hotkey\':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions
|
||||
|
||||
Finally, output the action as PyAutoGUI code or the following functions:
|
||||
- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}}
|
||||
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}}
|
||||
""".strip()
|
||||
|
||||
|
||||
# SYSTEM_PROMPT for OpenCUA-72B
|
||||
general_computer_instructions = [
|
||||
"""
|
||||
You are a GUI agent. You are given a task, a screenshot of the screen and your previous interactions with the computer. You need to perform a series of actions to complete the task. The password of the computer is "{password}", use it when you need sudo rights. You need to **wait** explicitly for installation, waiting website loading or running commands to finish. Don\'t terminate the task unless you are sure the task is finished. If you find that you can\'t finish the task, or the task is not finished exactly as the instruction indicates (you have made progress but not finished the task completely), or the task is impossible to complete, you must report **failure**.
|
||||
""".strip(),
|
||||
"""
|
||||
You are acting as a GUI agent. A task description, a screenshot, and your past interactions will be supplied. Execute the necessary steps to fulfil the task. Whenever sudo operations are required, use the computer’s password "{password}". Insert an explicit **wait** after launching any installation, waiting website loading or long-running command to let it finish. Do not output terminate action unless you are certain the task is complete. If you realise the task can be finished or impossible to do, you should report **failure**.
|
||||
""".strip(),
|
||||
"""
|
||||
Your mission as a GUI agent is to complete the provided task using the current screen image and the history of interactions. For commands requiring elevated privileges, supply "{password}" as the sudo password. Explicitly invoke **wait** after launching any installation or command that may take time to finish. Do not terminate the session unless success is certain. If the task cannot be fully executed, or turns out impossible, you must declare **failure**.
|
||||
""".strip(),
|
||||
]
|
||||
|
||||
l3_format_instruction = """For each step, provide your response in this format:
|
||||
# Step: {step number}
|
||||
## Observation:
|
||||
{observation}
|
||||
## Thought:
|
||||
{thought}
|
||||
## Action:
|
||||
{action}
|
||||
## Code:
|
||||
{code}"""
|
||||
|
||||
l2_format_instruction = """For each step, provide your response in this format:
|
||||
# Step: {step number}
|
||||
## Thought:
|
||||
{thought}
|
||||
## Action:
|
||||
{action}
|
||||
## Code:
|
||||
{code}"""
|
||||
|
||||
l1_format_instruction = """For each step, provide your response in this format:
|
||||
# Step: {step number}
|
||||
## Action:
|
||||
{action}
|
||||
## Code:
|
||||
{code}"""
|
||||
|
||||
observation_instructions = [
|
||||
"""For the Observation section, you should include the following parts if helpful:
|
||||
- Describe the current computer state based on the full screenshot in detail.
|
||||
- Application Context:
|
||||
- The active application
|
||||
- The active window or page
|
||||
- Overall layout and visible interface
|
||||
- Key Elements:
|
||||
- Menu items and toolbars
|
||||
- Buttons and controls
|
||||
- Text fields and content
|
||||
- Dialog boxes or popups
|
||||
- Error messages or notifications
|
||||
- Loading states
|
||||
- Other key elements
|
||||
- Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).
|
||||
""".strip(),
|
||||
|
||||
"""In the Observation section, outline everything visible on screen that could influence your next move:
|
||||
• Current system state as seen in the screenshot.
|
||||
• Application context:
|
||||
- Which application is running in the foreground
|
||||
- Specific window, tab, or page being displayed
|
||||
- High-level layout of panels, sidebars, and work areas
|
||||
• Salient interface elements:
|
||||
- Menus, ribbons, and toolbars
|
||||
- Actionable buttons, icons, toggles, and controls
|
||||
- Input areas such as text boxes or code editors
|
||||
- Pop-up dialogs, modals, alerts, or system notifications
|
||||
- Progress bars, spinners, or other loading indicators
|
||||
• Any text, labels, shapes, or on-screen cues that might help accomplish the task (cite names or visual traits when available).
|
||||
""".strip(),
|
||||
|
||||
# ── Variant 3 ──────────────────────────────────────────────────────────
|
||||
"""Write the Observation section as a thorough snapshot of the UI:
|
||||
- Start with a full-screen description: what the user sees at a glance.
|
||||
- Give application details: title, active workspace, and structural layout.
|
||||
- Enumerate critical elements:
|
||||
* Navigation menus and context bars
|
||||
* Primary and secondary buttons or icons
|
||||
* Editable fields, lists, tables, or rich-text areas
|
||||
* Dialogs, pop-ups, warnings, or confirmations
|
||||
* Indicators of loading or processing activity
|
||||
- Note any evidence, hints, or data (textual or visual) that could guide the task toward completion, referencing names, colors, shapes, or positions when explicit identifiers are missing.
|
||||
""".strip(),
|
||||
]
|
||||
|
||||
thought_instructions = [
|
||||
"""For the Thought section, you should include the following parts:
|
||||
- Reflection on the task when there is previous action:
|
||||
- Consider the correnctness of previous action and its outcomes
|
||||
- If the previous action was correct, describe the change in the state of the computer and reason
|
||||
- If the previous action was incorrect, reflect on what went wrong and why
|
||||
- Step by Step Progress Assessment:
|
||||
- Add necessary information according to the history screenshots, former actions and current screenshot.
|
||||
- Analyze what parts of the task have already been completed and how they contribute to the overall goal.
|
||||
- Make a plan on how to complete the task based on the history and currect screenshot.
|
||||
- Next Action Prediction:
|
||||
- Propose the most possible next action and state the reason
|
||||
- For Text Input Actions:
|
||||
- Note current cursor position
|
||||
- Consolidate repetitive actions (specify count for multiple keypresses)
|
||||
- Describe expected final text outcome
|
||||
- Use first-person perspective in reasoning
|
||||
""".strip(),
|
||||
|
||||
"""
|
||||
In the **Thought** block, cover these topics:
|
||||
|
||||
1. **Last-Step Reflection** (when a prior action exists)
|
||||
• Was my previous action correct? What evidence shows this?
|
||||
• If it succeeded, what state change occurred and why?
|
||||
• If it failed, where did I go wrong?
|
||||
|
||||
2. **Incremental Progress Audit**
|
||||
• Which sub-tasks are completed and how do they advance the mission?
|
||||
• Make a plan to finish the task based on past actions and the current UI state.
|
||||
|
||||
3. **Foresight for the Coming Action**
|
||||
• Predict the most logical next step.
|
||||
• State the reason why it is the best choice given the current context.
|
||||
|
||||
4. **Guidance for Text Entry**
|
||||
• Note the cursor location
|
||||
• Compress multiple identical keystrokes (e.g., “press Backspace ×3”)
|
||||
• Clarify the exact text expected after input
|
||||
|
||||
Use first-person inner dialogue throughout.
|
||||
""".strip(),
|
||||
|
||||
"""
|
||||
Compose your **Thought** section as an internal monologue that includes:
|
||||
|
||||
- **Retrospective** (if a prior step exists):
|
||||
* Evaluate the accuracy and effect of the last action.
|
||||
* If it was successful, reason about the resulting interface change.
|
||||
* If it was faulty, diagnose the misstep and its cause.
|
||||
|
||||
- **Ongoing Progress Evaluation**:
|
||||
* Outline which parts of the task are done and their impact on the overall objective.
|
||||
* Suggest a plan to complete the task based on past history and the current screen.
|
||||
|
||||
- **Decision Framework for the Next Move**:
|
||||
* Brainstorm possible next action given the present state.
|
||||
* Explain why this action is the most logical choice.
|
||||
|
||||
- **Special Rules for Keyboard Input**:
|
||||
* Specify current cursor focus or field.
|
||||
* Merge repeated keypresses into counts for brevity.
|
||||
* Describe the intended final text after typing.
|
||||
|
||||
Maintain a first-person voice for clarity of reasoning.
|
||||
""".strip(),
|
||||
]
|
||||
|
||||
action_instructions = [
|
||||
"""For the action section, you should provide clear, concise, and actionable instructions in one sentence.
|
||||
- If the action involves interacting with a specific target:
|
||||
- Describe target explicitly (if multiple elements share that name, you should distinguish the target) without using coordinates
|
||||
- Specify element names when possible (use original language if non-English)
|
||||
- Describe features (shape, color, position) if name unavailable
|
||||
- If the action involves keyboard actions like 'press', 'write', 'hotkey':
|
||||
- Consolidate repetitive keypresses with count
|
||||
- Specify expected text outcome for typing actions
|
||||
""".strip(),
|
||||
|
||||
"""
|
||||
Write the **Action** in one short, direct sentence.
|
||||
|
||||
• When clicking or otherwise interacting with a UI element:
|
||||
- Name the element explicitly — and, if multiple elements share that name, add a distinguishing detail.
|
||||
- Do **not** give coordinates.
|
||||
- Use the element's label (keep original language when it isn't English).
|
||||
- If unnamed, describe recognisable traits (shape, colour, on-screen position).
|
||||
|
||||
• When using the keyboard (press, type, hotkey):
|
||||
- Collapse repeated key presses into counts.
|
||||
- For typing, specify the text that should appear.
|
||||
""".strip(),
|
||||
|
||||
"""
|
||||
Provide the **Action** as a single, crisp imperative sentence.
|
||||
|
||||
- Mouse/GUI interactions:
|
||||
* Identify the target by name, and if duplicate names exist, clarify which one you mean.
|
||||
* Do not supply XY coordinates.
|
||||
* Preserve non-English labels verbatim.
|
||||
* If unnamed, describe the element's look or location (colour, shape, relative position).
|
||||
|
||||
- Keyboard operations (press, write, hotkey):
|
||||
* Combine repeated keystrokes with a multiplier.
|
||||
* State the exact text that will be entered.
|
||||
""".strip(),
|
||||
]
|
||||
|
||||
code_instrucion = """For the code section, you should output the corresponding code for the action. The code should be either PyAutoGUI code or one of the following functions warped in the code block:
|
||||
- {"name": "computer.wait", "description": "Make the computer wait for 20 seconds for installation, running code, etc.", "parameters": {"type": "object", "properties": {}, "required": []}}
|
||||
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}, {"answer": {"type": "string", "description": "The answer of the task"}}, "required": ["status"]}}
|
||||
Examples for the code section:
|
||||
```python
|
||||
pyautogui.click(x=123, y=456)
|
||||
```
|
||||
```code
|
||||
computer.terminate(status="success")
|
||||
```
|
||||
```code
|
||||
computer.terminate(status="success", answer='''text''')
|
||||
```"""
|
||||
|
||||
SYSTEM_PROMPT_V2_L1 = """
|
||||
{general_computer_instruction}
|
||||
|
||||
{format_instruction}
|
||||
|
||||
{action_instruction}
|
||||
|
||||
{code_instruction}
|
||||
""".strip()
|
||||
|
||||
SYSTEM_PROMPT_V2_L2 = """
|
||||
{general_computer_instruction}
|
||||
|
||||
{format_instruction}
|
||||
|
||||
{thought_instruction}
|
||||
|
||||
{action_instruction}
|
||||
|
||||
{code_instruction}
|
||||
""".strip()
|
||||
|
||||
SYSTEM_PROMPT_V2_L3 = """
|
||||
{general_computer_instruction}
|
||||
|
||||
{format_instruction}
|
||||
|
||||
{observation_instruction}
|
||||
|
||||
{thought_instruction}
|
||||
|
||||
{action_instruction}
|
||||
|
||||
{code_instruction}
|
||||
""".strip()
|
||||
|
||||
|
||||
def build_sys_prompt(level, password="password", use_random=False):
|
||||
if not use_random:
|
||||
if level == "l1":
|
||||
return SYSTEM_PROMPT_V2_L1.format(
|
||||
general_computer_instruction=general_computer_instructions[0].format(
|
||||
password=password
|
||||
),
|
||||
format_instruction=l1_format_instruction,
|
||||
action_instruction=action_instructions[0],
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
elif level == "l2":
|
||||
return SYSTEM_PROMPT_V2_L2.format(
|
||||
general_computer_instruction=general_computer_instructions[0].format(
|
||||
password=password
|
||||
),
|
||||
format_instruction=l2_format_instruction,
|
||||
thought_instruction=thought_instructions[0],
|
||||
action_instruction=action_instructions[0],
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
elif level == "l3":
|
||||
return SYSTEM_PROMPT_V2_L3.format(
|
||||
general_computer_instruction=general_computer_instructions[0].format(
|
||||
password=password
|
||||
),
|
||||
format_instruction=l3_format_instruction,
|
||||
observation_instruction=observation_instructions[0],
|
||||
thought_instruction=thought_instructions[0],
|
||||
action_instruction=action_instructions[0],
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
else:
|
||||
raise ValueError("Invalid level. Choose from 'l1', 'l2', or 'l3'.")
|
||||
else:
|
||||
if level == "l1":
|
||||
return SYSTEM_PROMPT_V2_L1.format(
|
||||
general_computer_instruction=random.choice(
|
||||
general_computer_instructions
|
||||
),
|
||||
format_instruction=l1_format_instruction,
|
||||
action_instruction=random.choice(action_instructions),
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
elif level == "l2":
|
||||
return SYSTEM_PROMPT_V2_L2.format(
|
||||
general_computer_instruction=random.choice(
|
||||
general_computer_instructions
|
||||
),
|
||||
format_instruction=l2_format_instruction,
|
||||
thought_instruction=random.choice(thought_instructions),
|
||||
action_instruction=random.choice(action_instructions),
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
elif level == "l3":
|
||||
return SYSTEM_PROMPT_V2_L3.format(
|
||||
general_computer_instruction=random.choice(
|
||||
general_computer_instructions
|
||||
),
|
||||
format_instruction=l3_format_instruction,
|
||||
observation_instruction=random.choice(observation_instructions),
|
||||
thought_instruction=random.choice(thought_instructions),
|
||||
action_instruction=random.choice(action_instructions),
|
||||
code_instruction=code_instrucion,
|
||||
)
|
||||
else:
|
||||
raise ValueError("Invalid level. Choose from 'l1', 'l2', or 'l3'.")
|
||||
|
||||
|
||||
# Modeling prompt templates for generating trajectories
|
||||
STEP_TEMPLATE = "# Step {step_num}:\n"
|
||||
INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n"
|
||||
|
||||
ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
|
||||
THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n"
|
||||
OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n"
|
||||
|
||||
ACTION_HISTORY_TEMPLATE_WITH_CODE = "## Action:\n{action}\n\n## Code:\n{code}\n"
|
||||
THOUGHT_HISTORY_TEMPLATE_WITH_CODE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
|
||||
OBSERVATION_HISTORY_TEMPLATE_WITH_CODE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
|
||||
|
|
@ -0,0 +1,483 @@
|
|||
import re
|
||||
import base64
|
||||
from loguru import logger
|
||||
from typing import List, Optional
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
import tempfile
|
||||
import os
|
||||
import math
|
||||
|
||||
def encode_image(image_content):
|
||||
return base64.b64encode(image_content).decode("utf-8")
|
||||
|
||||
def smart_resize(
|
||||
height: int,
|
||||
width: int,
|
||||
factor: int = 28,
|
||||
min_pixels: int = 56 * 56,
|
||||
max_pixels: int = 14 * 14 * 4 * 1280,
|
||||
max_aspect_ratio_allowed: Optional[float] = None,
|
||||
size_can_be_smaller_than_factor: bool = False,
|
||||
):
|
||||
"""Rescales the image so that the following conditions are met:
|
||||
|
||||
1. Both dimensions (height and width) are divisible by 'factor'.
|
||||
|
||||
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
|
||||
|
||||
3. The aspect ratio of the image is maintained as closely as possible.
|
||||
|
||||
"""
|
||||
if not size_can_be_smaller_than_factor and (height < factor or width < factor):
|
||||
raise ValueError(
|
||||
f"height:{height} or width:{width} must be larger than factor:{factor} "
|
||||
f"(when size_can_be_smaller_than_factor is False)"
|
||||
)
|
||||
elif (
|
||||
max_aspect_ratio_allowed is not None
|
||||
and max(height, width) / min(height, width) > max_aspect_ratio_allowed
|
||||
):
|
||||
raise ValueError(
|
||||
f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, "
|
||||
f"got {max(height, width) / min(height, width)}"
|
||||
f"(when max_aspect_ratio_allowed is not None)"
|
||||
)
|
||||
h_bar = max(1, round(height / factor)) * factor
|
||||
w_bar = max(1, round(width / factor)) * factor
|
||||
if h_bar * w_bar > max_pixels:
|
||||
beta = math.sqrt((height * width) / max_pixels)
|
||||
h_bar = max(1, math.floor(height / beta / factor)) * factor
|
||||
w_bar = max(1, math.floor(width / beta / factor)) * factor
|
||||
elif h_bar * w_bar < min_pixels:
|
||||
beta = math.sqrt(min_pixels / (height * width))
|
||||
h_bar = math.ceil(height * beta / factor) * factor
|
||||
w_bar = math.ceil(width * beta / factor) * factor
|
||||
return h_bar, w_bar
|
||||
|
||||
def call_openai_naive(model, payload, address_hint=None):
|
||||
"""
|
||||
Naive OpenAI API call using requests.
|
||||
"""
|
||||
# Extract fields from payload
|
||||
model = payload.get("model")
|
||||
payload["model"] = model.model_id if hasattr(model, "model_id") else "None"
|
||||
# address_hint not used here
|
||||
base_url = model.base_url
|
||||
# logger.warning(f"Base URL: {base_url}, Payload model: {payload['model']}")
|
||||
url = f"{base_url}/chat/completions"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
data = {
|
||||
**payload,
|
||||
"n": 1,
|
||||
}
|
||||
max_retry = 5
|
||||
chat_completions = None
|
||||
success = False
|
||||
while success is False and max_retry > 0:
|
||||
try:
|
||||
json_data = json.dumps(data)
|
||||
response = requests.post(
|
||||
url, headers=headers, data=json_data, timeout=120, verify=False
|
||||
)
|
||||
if response.status_code == 200:
|
||||
chat_completions = response.json()
|
||||
try:
|
||||
finish_reason = chat_completions["choices"][0].get("finish_reason")
|
||||
if (
|
||||
finish_reason is not None and finish_reason == "stop"
|
||||
): # for most of the time, length will not exceed max_tokens
|
||||
success = True
|
||||
else:
|
||||
time.sleep(5)
|
||||
max_retry -= 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error in processing chat completion: {e}")
|
||||
time.sleep(5)
|
||||
max_retry -= 1
|
||||
else:
|
||||
logger.error(f"Failed to call OpenAI API: {response.text}")
|
||||
time.sleep(5)
|
||||
max_retry -= 1
|
||||
except requests.exceptions.ReadTimeout:
|
||||
# timeout is normal, don't print trace
|
||||
max_retry -= 1
|
||||
logger.warning(f"Timeout in OpenAI API call, left retries: {max_retry}")
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
max_retry -= 1
|
||||
logger.exception(f"Failed to call OpenAI API: {e}")
|
||||
time.sleep(5)
|
||||
|
||||
if chat_completions is None:
|
||||
raise RuntimeError("Failed to call OpenAI API, max_retry used up")
|
||||
try:
|
||||
infos = {}
|
||||
if "choices" in chat_completions:
|
||||
infos["finish_reason"] = chat_completions["choices"][0].get("finish_reason")
|
||||
infos["n"] = len(chat_completions["choices"])
|
||||
if "tool_calls" in chat_completions["choices"][0]["message"]:
|
||||
infos["tool_calls"] = chat_completions["choices"][0]["message"][
|
||||
"tool_calls"
|
||||
]
|
||||
infos["choices"] = chat_completions["choices"] # for the case of n > 1
|
||||
if "usage" in chat_completions:
|
||||
infos["usage"] = chat_completions["usage"]
|
||||
return chat_completions["choices"][0]["message"]["content"], infos
|
||||
except Exception as e:
|
||||
logger.error(f"Error in processing chat completion {e}")
|
||||
return "", {"n": 1, "usage": 0, "finish_reason": f"error {e}"}
|
||||
|
||||
|
||||
def preprocess_for_naive_openai(self, payload):
|
||||
if isinstance(payload["model"], str):
|
||||
payload["model"] = getattr(self, "openai_client", None)
|
||||
return payload
|
||||
|
||||
def encoded_img_to_pil_img(data_str):
|
||||
base64_str = data_str.replace("data:image/png;base64,", "")
|
||||
image_data = base64.b64decode(base64_str)
|
||||
return Image.open(BytesIO(image_data))
|
||||
|
||||
|
||||
def save_to_tmp_img_file(data_str):
|
||||
base64_str = data_str.replace("data:image/png;base64,", "")
|
||||
image_data = base64.b64decode(base64_str)
|
||||
image = Image.open(BytesIO(image_data))
|
||||
|
||||
tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
|
||||
image.save(tmp_img_path)
|
||||
|
||||
return tmp_img_path
|
||||
|
||||
|
||||
def bbox_to_center_1000(bbox: str) -> tuple[int, int]:
|
||||
regex_list = [
|
||||
r"<\|box_start\|>\((\d+),(\d+)\),\((\d+),(\d+)\)<\|box_end\|>", # '<|box_start|>(576,12),(592,42)<|box_end|>'
|
||||
r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]<\|box_end\|>", # '<|box_start|>[[576, 12, 592, 42]]<|box_end|>'
|
||||
r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]<\|box_end\|>", # '<|box_start|>[[576, 12, 592, 42]<|box_end|>', this is actually wrong format, but we parse it anyway
|
||||
r"<\|box_start\|>\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)<\|box_end\|>", # '<|box_start|>(576, 12, 592, 42)<|box_end|>', this is actually wrong format, but we parse it anyway
|
||||
r"\((\d+),(\d+)\),\((\d+),(\d+)\)", # Versions without the 'bbox' special tokens
|
||||
r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]",
|
||||
r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]",
|
||||
r"\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)",
|
||||
]
|
||||
for regex in regex_list:
|
||||
match = re.search(regex, bbox)
|
||||
if match:
|
||||
break
|
||||
if not match:
|
||||
raise ValueError(
|
||||
f"Bounding box coordinates not found in the input string: {bbox}"
|
||||
)
|
||||
x_top_left, y_top_left, x_bottom_right, y_bottom_right = map(int, match.groups())
|
||||
x_center = (x_top_left + x_bottom_right) // 2
|
||||
y_center = (y_top_left + y_bottom_right) // 2
|
||||
return x_center, y_center
|
||||
|
||||
|
||||
def bbox_to_center_1(bbox: str) -> tuple[int, int]:
|
||||
regex_list = [
|
||||
r"\[\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*\]",
|
||||
]
|
||||
for regex in regex_list:
|
||||
match = re.search(regex, bbox)
|
||||
if match:
|
||||
break
|
||||
if not match:
|
||||
raise ValueError(
|
||||
f"Bounding box coordinates not found in the input string: {bbox}"
|
||||
)
|
||||
coordinates = tuple(map(float, match.groups()))
|
||||
coordinates = [int(coord * 1000) for coord in coordinates]
|
||||
x_center = (coordinates[0] + coordinates[2]) // 2
|
||||
y_center = (coordinates[1] + coordinates[3]) // 2
|
||||
return x_center, y_center
|
||||
|
||||
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
|
||||
if coordinate_type == "relative":
|
||||
return int(round(x * screen_width)), int(round(y * screen_height))
|
||||
elif coordinate_type == "absolute":
|
||||
return x, y
|
||||
elif coordinate_type == "qwen25":
|
||||
height, width = smart_resize(
|
||||
height=screen_height,
|
||||
width=screen_width,
|
||||
factor=28,
|
||||
min_pixels=3136,
|
||||
max_pixels=12845056,
|
||||
)
|
||||
return int(x / width * screen_width), int(y / height * screen_height)
|
||||
elif coordinate_type == "relative1000":
|
||||
if screen_width == 0 or screen_height == 0:
|
||||
raise ValueError(
|
||||
"Screen width and height must be greater than zero for relative1000 coordinates."
|
||||
)
|
||||
x_abs = int(round(x * screen_width / 1000))
|
||||
y_abs = int(round(y * screen_height / 1000))
|
||||
return x_abs, y_abs
|
||||
else:
|
||||
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
|
||||
|
||||
|
||||
def rescale_coord(
|
||||
coord: tuple[int, int],
|
||||
original_width: int,
|
||||
original_height: int,
|
||||
scaled_width=1000,
|
||||
scaled_height=1000,
|
||||
) -> tuple[int, int]:
|
||||
# According to https://huggingface.co/spaces/maxiw/OS-ATLAS/blob/398c3256a4fec409a074e0e4b5ac1d1d5bf7c240/app.py#L36
|
||||
# It seems that OS-ATLAS model are rescaled to output 1000x1000 images
|
||||
# So we need to rescale the coordinates back to the original image size
|
||||
x_scale = original_width / scaled_width
|
||||
y_scale = original_height / scaled_height
|
||||
return int(coord[0] * x_scale), int(coord[1] * y_scale)
|
||||
|
||||
|
||||
def _pyautogui_code_to_absolute_coordinates(
|
||||
pyautogui_code_relative_coordinates,
|
||||
logical_screen_size,
|
||||
coordinate_type="relative",
|
||||
model_input_size=None,
|
||||
):
|
||||
"""
|
||||
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
|
||||
"""
|
||||
import re
|
||||
import ast
|
||||
|
||||
if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
|
||||
raise ValueError(
|
||||
f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25']."
|
||||
)
|
||||
|
||||
screen_width, screen_height = logical_screen_size
|
||||
if model_input_size is not None:
|
||||
model_width, model_height = model_input_size
|
||||
width_scale, height_scale = (
|
||||
screen_width / model_width,
|
||||
screen_height / model_height,
|
||||
)
|
||||
else:
|
||||
width_scale, height_scale = 1, 1
|
||||
|
||||
pattern = r"(pyautogui\.\w+\([^\)]*\))"
|
||||
|
||||
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
|
||||
|
||||
new_code = pyautogui_code_relative_coordinates
|
||||
|
||||
for full_call in matches:
|
||||
func_name_pattern = r"(pyautogui\.\w+)\((.*)\)"
|
||||
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
|
||||
if not func_match:
|
||||
continue
|
||||
|
||||
func_name = func_match.group(1)
|
||||
args_str = func_match.group(2)
|
||||
|
||||
try:
|
||||
parsed = ast.parse(f"func({args_str})").body[0].value
|
||||
parsed_args = parsed.args
|
||||
parsed_keywords = parsed.keywords
|
||||
except SyntaxError:
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
function_parameters = {
|
||||
"click": ["x", "y", "clicks", "interval", "button", "duration", "pause"],
|
||||
"moveTo": ["x", "y", "duration", "tween", "pause"],
|
||||
"moveRel": ["xOffset", "yOffset", "duration", "tween", "pause"],
|
||||
"dragTo": ["x", "y", "duration", "button", "mouseDownUp", "pause"],
|
||||
"dragRel": [
|
||||
"xOffset",
|
||||
"yOffset",
|
||||
"duration",
|
||||
"button",
|
||||
"mouseDownUp",
|
||||
"pause",
|
||||
],
|
||||
"doubleClick": ["x", "y", "interval", "button", "duration", "pause"],
|
||||
}
|
||||
|
||||
func_base_name = func_name.split(".")[-1]
|
||||
|
||||
param_names = function_parameters.get(func_base_name, [])
|
||||
|
||||
args = {}
|
||||
for idx, arg in enumerate(parsed_args):
|
||||
if idx < len(param_names):
|
||||
param_name = param_names[idx]
|
||||
arg_value = ast.literal_eval(arg)
|
||||
args[param_name] = arg_value
|
||||
|
||||
try:
|
||||
for kw in parsed_keywords:
|
||||
param_name = kw.arg
|
||||
arg_value = ast.literal_eval(kw.value)
|
||||
args[param_name] = arg_value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing keyword arguments: {e}")
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
updated = False
|
||||
if "x" in args and "y" in args:
|
||||
try:
|
||||
x_rel = float(args["x"])
|
||||
y_rel = float(args["y"])
|
||||
x_abs, y_abs = _coordinate_projection(
|
||||
x_rel, y_rel, screen_width, screen_height, coordinate_type
|
||||
)
|
||||
# logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
|
||||
args["x"] = x_abs * width_scale
|
||||
args["y"] = y_abs * height_scale
|
||||
updated = True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if "xOffset" in args and "yOffset" in args:
|
||||
try:
|
||||
x_rel = float(args["xOffset"])
|
||||
y_rel = float(args["yOffset"])
|
||||
x_abs, y_abs = _coordinate_projection(
|
||||
x_rel, y_rel, screen_width, screen_height, coordinate_type
|
||||
)
|
||||
args["xOffset"] = x_abs * width_scale
|
||||
args["yOffset"] = y_abs * height_scale
|
||||
updated = True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if updated:
|
||||
reconstructed_args = []
|
||||
for idx, param_name in enumerate(param_names):
|
||||
if param_name in args:
|
||||
arg_value = args[param_name]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"'{arg_value}'"
|
||||
else:
|
||||
arg_repr = str(arg_value)
|
||||
reconstructed_args.append(arg_repr)
|
||||
else:
|
||||
break
|
||||
|
||||
used_params = set(param_names[: len(reconstructed_args)])
|
||||
for kw in parsed_keywords:
|
||||
if kw.arg not in used_params:
|
||||
arg_value = args[kw.arg]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"{kw.arg}='{arg_value}'"
|
||||
else:
|
||||
arg_repr = f"{kw.arg}={arg_value}"
|
||||
reconstructed_args.append(arg_repr)
|
||||
|
||||
new_args_str = ", ".join(reconstructed_args)
|
||||
new_full_call = f"{func_name}({new_args_str})"
|
||||
new_code = new_code.replace(full_call, new_full_call)
|
||||
|
||||
return new_code
|
||||
|
||||
|
||||
def split_args(args_str: str) -> List[str]:
|
||||
args = []
|
||||
current_arg = ""
|
||||
within_string = False
|
||||
string_char = ""
|
||||
prev_char = ""
|
||||
for char in args_str:
|
||||
if char in ['"', "'"]:
|
||||
if not within_string:
|
||||
within_string = True
|
||||
string_char = char
|
||||
elif within_string and prev_char != "\\" and char == string_char:
|
||||
within_string = False
|
||||
if char == "," and not within_string:
|
||||
args.append(current_arg)
|
||||
current_arg = ""
|
||||
else:
|
||||
current_arg += char
|
||||
prev_char = char
|
||||
if current_arg:
|
||||
args.append(current_arg)
|
||||
return args
|
||||
|
||||
|
||||
def correct_pyautogui_arguments(code: str) -> str:
|
||||
function_corrections = {
|
||||
"write": {
|
||||
"incorrect_args": ["text", "content"],
|
||||
"correct_args": [],
|
||||
"keyword_arg": "message",
|
||||
},
|
||||
"press": {
|
||||
"incorrect_args": ["key", "button"],
|
||||
"correct_args": [],
|
||||
"keyword_arg": None,
|
||||
},
|
||||
"hotkey": {
|
||||
"incorrect_args": ["key1", "key2", "keys"],
|
||||
"correct_args": [],
|
||||
"keyword_arg": None,
|
||||
},
|
||||
}
|
||||
|
||||
lines = code.strip().split("\n")
|
||||
corrected_lines = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
match = re.match(r"(pyautogui\.(\w+))\((.*)\)", line)
|
||||
if match:
|
||||
full_func_call = match.group(1)
|
||||
func_name = match.group(2)
|
||||
args_str = match.group(3)
|
||||
|
||||
if func_name in function_corrections:
|
||||
func_info = function_corrections[func_name]
|
||||
args = split_args(args_str)
|
||||
corrected_args = []
|
||||
|
||||
for arg in args:
|
||||
arg = arg.strip()
|
||||
kwarg_match = re.match(r"(\w+)\s*=\s*(.*)", arg)
|
||||
if kwarg_match:
|
||||
arg_name = kwarg_match.group(1)
|
||||
arg_value = kwarg_match.group(2)
|
||||
|
||||
if arg_name in func_info["incorrect_args"]:
|
||||
if func_info["keyword_arg"]:
|
||||
corrected_args.append(
|
||||
f"{func_info['keyword_arg']}={arg_value}"
|
||||
)
|
||||
else:
|
||||
corrected_args.append(arg_value)
|
||||
else:
|
||||
corrected_args.append(f"{arg_name}={arg_value}")
|
||||
else:
|
||||
corrected_args.append(arg)
|
||||
|
||||
corrected_args_str = ", ".join(corrected_args)
|
||||
corrected_line = f"{full_func_call}({corrected_args_str})"
|
||||
corrected_lines.append(corrected_line)
|
||||
else:
|
||||
corrected_lines.append(line)
|
||||
else:
|
||||
corrected_lines.append(line)
|
||||
|
||||
corrected_code = "\n".join(corrected_lines)
|
||||
return corrected_code
|
||||
|
||||
def image_message_from_obs(obs, for_training=False):
|
||||
if not for_training:
|
||||
return {
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
|
||||
"detail": "high",
|
||||
},
|
||||
}
|
||||
else:
|
||||
return {"type": "image_url", "image_url": {"url": obs["screenshot_path"]}}
|
||||
|
|
@ -1,736 +0,0 @@
|
|||
"""
|
||||
OpenCUA Agent Implementation
|
||||
|
||||
This module implements an OpenCUA agent for desktop automation tasks, building upon
|
||||
existing frameworks and integrating multiple coordinate mapping systems.
|
||||
|
||||
Framework and Implementation Sources:
|
||||
- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py
|
||||
- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py
|
||||
- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import ast
|
||||
import time
|
||||
import math
|
||||
import httpx
|
||||
import base64
|
||||
import backoff
|
||||
from loguru import logger
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
|
||||
# System prompts used in the training data
|
||||
AGNET_SYS_PROMPT_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
|
||||
# AGNET_SYS_PROMPT_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
|
||||
AGNET_SYS_PROMPT_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip()
|
||||
|
||||
# Testing prompt on OSWorld-Verified
|
||||
AGNET_SYS_PROMPT_L2 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. The password of the computer is "osworld-public-evaluation". If the task is not possible to do, output the action computer.terminate(status='failure').
|
||||
|
||||
For each step, provide your response in this format:
|
||||
|
||||
Thought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning
|
||||
|
||||
Action:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize "—", maximize "□", close "X")\n - if the action involves keyboard actions like \'press\', \'write\', \'hotkey\':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions
|
||||
|
||||
Finally, output the action as PyAutoGUI code or the following functions:
|
||||
- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}}
|
||||
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}}
|
||||
""".strip()
|
||||
|
||||
|
||||
STEP_TEMPLATE = "# Step {step_num}:\n"
|
||||
INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n"
|
||||
|
||||
ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
|
||||
THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n"
|
||||
OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n"
|
||||
DETAIL_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
|
||||
|
||||
|
||||
def encode_image(image_content):
|
||||
"""Encode the image to base64"""
|
||||
return base64.b64encode(image_content).decode('utf-8')
|
||||
|
||||
def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]:
|
||||
"""Parse response including Observation, Thought, Action and code block"""
|
||||
try:
|
||||
sections = {}
|
||||
|
||||
obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if obs_match:
|
||||
sections['observation'] = obs_match.group(1).strip()
|
||||
|
||||
thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if thought_match:
|
||||
sections['thought'] = thought_match.group(1).strip()
|
||||
|
||||
action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
|
||||
if action_match:
|
||||
action = action_match.group(1).strip()
|
||||
sections['action'] = action.strip()
|
||||
|
||||
if "computer.terminate" in input_string.lower():
|
||||
# Look for code blocks that might contain terminate command
|
||||
code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
|
||||
if code_blocks:
|
||||
last_code = code_blocks[-1].strip().lower()
|
||||
if "fail" in last_code:
|
||||
sections['code'] = "FAIL"
|
||||
return "FAIL", ["FAIL"], sections
|
||||
elif "success" in last_code:
|
||||
sections['code'] = "DONE"
|
||||
return "DONE", ["DONE"], sections
|
||||
# Default to DONE if terminate is mentioned but no specific status
|
||||
sections['code'] = "DONE"
|
||||
return "DONE", ["DONE"], sections
|
||||
|
||||
code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL)
|
||||
if code_blocks:
|
||||
code = code_blocks[-1].strip()
|
||||
sections['original_code'] = transform_agnet_action_to_code_block(code)
|
||||
corrected_code = correct_pyautogui_arguments(code)
|
||||
sections['code'] = corrected_code
|
||||
sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type)
|
||||
else:
|
||||
# No code blocks found
|
||||
sections['code'] = "WAIT"
|
||||
return "WAIT", ["WAIT"], sections
|
||||
|
||||
if 'code' not in sections:
|
||||
logger.error("Missing required action or code section")
|
||||
return None, None, {}
|
||||
|
||||
if 'action' not in sections:
|
||||
sections['action'] = ""
|
||||
|
||||
return sections['action'], [sections['code']], sections
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error parsing response: {str(e)}\nInput string: {input_string}")
|
||||
return None, None, {}
|
||||
|
||||
def correct_pyautogui_arguments(code: str) -> str:
|
||||
"""Correct the pyautogui arguments"""
|
||||
function_corrections = {
|
||||
'write': {
|
||||
'incorrect_args': ['text', 'content'],
|
||||
'correct_args': [],
|
||||
'keyword_arg': 'message'
|
||||
},
|
||||
'press': {
|
||||
'incorrect_args': ['key', 'button'],
|
||||
'correct_args': [],
|
||||
'keyword_arg': None
|
||||
},
|
||||
'hotkey': {
|
||||
'incorrect_args': ['key1', 'key2', 'keys'],
|
||||
'correct_args': [],
|
||||
'keyword_arg': None
|
||||
},
|
||||
}
|
||||
|
||||
lines = code.strip().split('\n')
|
||||
corrected_lines = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line)
|
||||
if match:
|
||||
full_func_call = match.group(1)
|
||||
func_name = match.group(2)
|
||||
args_str = match.group(3)
|
||||
|
||||
if func_name in function_corrections:
|
||||
func_info = function_corrections[func_name]
|
||||
args = split_args(args_str)
|
||||
corrected_args = []
|
||||
|
||||
for arg in args:
|
||||
arg = arg.strip()
|
||||
kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg)
|
||||
if kwarg_match:
|
||||
arg_name = kwarg_match.group(1)
|
||||
arg_value = kwarg_match.group(2)
|
||||
|
||||
if arg_name in func_info['incorrect_args']:
|
||||
if func_info['keyword_arg']:
|
||||
corrected_args.append(f"{func_info['keyword_arg']}={arg_value}")
|
||||
else:
|
||||
corrected_args.append(arg_value)
|
||||
else:
|
||||
corrected_args.append(f'{arg_name}={arg_value}')
|
||||
else:
|
||||
corrected_args.append(arg)
|
||||
|
||||
corrected_args_str = ', '.join(corrected_args)
|
||||
corrected_line = f'{full_func_call}({corrected_args_str})'
|
||||
corrected_lines.append(corrected_line)
|
||||
else:
|
||||
corrected_lines.append(line)
|
||||
else:
|
||||
corrected_lines.append(line)
|
||||
|
||||
corrected_code = '\n'.join(corrected_lines)
|
||||
return corrected_code
|
||||
|
||||
def split_args(args_str: str) -> List[str]:
|
||||
"""Split the arguments string into a list of arguments"""
|
||||
args = []
|
||||
current_arg = ''
|
||||
within_string = False
|
||||
string_char = ''
|
||||
prev_char = ''
|
||||
for char in args_str:
|
||||
if char in ['"', "'"]:
|
||||
if not within_string:
|
||||
within_string = True
|
||||
string_char = char
|
||||
elif within_string and prev_char != '\\' and char == string_char:
|
||||
within_string = False
|
||||
if char == ',' and not within_string:
|
||||
args.append(current_arg)
|
||||
current_arg = ''
|
||||
else:
|
||||
current_arg += char
|
||||
prev_char = char
|
||||
if current_arg:
|
||||
args.append(current_arg)
|
||||
return args
|
||||
|
||||
def smart_resize(
|
||||
height: int,
|
||||
width: int,
|
||||
factor: int,
|
||||
min_pixels: int,
|
||||
max_pixels: int,
|
||||
max_aspect_ratio_allowed: Optional[float] = None,
|
||||
size_can_be_smaller_than_factor: bool = False,
|
||||
):
|
||||
"""
|
||||
The function is modified from https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
|
||||
|
||||
Qwen2.5-VL based model need this function to resize screenshots.
|
||||
|
||||
Rescales the image so that the following conditions are met:
|
||||
1. Both dimensions (height and width) are divisible by 'factor'.
|
||||
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
|
||||
3. The aspect ratio of the image is maintained as closely as possible.
|
||||
|
||||
"""
|
||||
if not size_can_be_smaller_than_factor and (height < factor or width < factor):
|
||||
raise ValueError(
|
||||
f"height:{height} or width:{width} must be larger than factor:{factor} "
|
||||
f"(when size_can_be_smaller_than_factor is False)"
|
||||
)
|
||||
elif max_aspect_ratio_allowed is not None and max(height, width) / min(height, width) > max_aspect_ratio_allowed:
|
||||
raise ValueError(
|
||||
f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, "
|
||||
f"got {max(height, width) / min(height, width)}"
|
||||
f"(when max_aspect_ratio_allowed is not None)"
|
||||
)
|
||||
h_bar = max(1, round(height / factor)) * factor
|
||||
w_bar = max(1, round(width / factor)) * factor
|
||||
if h_bar * w_bar > max_pixels:
|
||||
beta = math.sqrt((height * width) / max_pixels)
|
||||
h_bar = max(1, math.floor(height / beta / factor)) * factor
|
||||
w_bar = max(1, math.floor(width / beta / factor)) * factor
|
||||
elif h_bar * w_bar < min_pixels:
|
||||
beta = math.sqrt(min_pixels / (height * width))
|
||||
h_bar = math.ceil(height * beta / factor) * factor
|
||||
w_bar = math.ceil(width * beta / factor) * factor
|
||||
return h_bar, w_bar
|
||||
|
||||
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
|
||||
"""Project the coordinates to the absolute scale"""
|
||||
if coordinate_type == "relative":
|
||||
return int(round(x * screen_width)), int(round(y * screen_height))
|
||||
elif coordinate_type == "absolute":
|
||||
return x, y
|
||||
elif coordinate_type == "qwen25":
|
||||
if 0 <= x <= 1 and 0 <= y <= 1:
|
||||
# If already normalized, treat like "relative"
|
||||
return int(round(x * screen_width)), int(round(y * screen_height))
|
||||
|
||||
height, width = smart_resize(
|
||||
height=screen_height,
|
||||
width=screen_width,
|
||||
factor=28,
|
||||
min_pixels=3136,
|
||||
max_pixels=12845056 # We use this max_pixels setting in our training data
|
||||
)
|
||||
return int(x / width * screen_width), int(y / height * screen_height)
|
||||
else:
|
||||
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
|
||||
|
||||
def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"):
|
||||
"""Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size."""
|
||||
if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
|
||||
raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].")
|
||||
|
||||
pattern = r'(pyautogui\.\w+\([^\)]*\))'
|
||||
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
|
||||
|
||||
new_code = pyautogui_code_relative_coordinates
|
||||
|
||||
for full_call in matches:
|
||||
func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
|
||||
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
|
||||
if not func_match:
|
||||
continue
|
||||
|
||||
func_name = func_match.group(1)
|
||||
args_str = func_match.group(2)
|
||||
|
||||
try:
|
||||
parsed = ast.parse(f"func({args_str})").body[0].value
|
||||
parsed_args = parsed.args
|
||||
parsed_keywords = parsed.keywords
|
||||
except SyntaxError:
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
function_parameters = {
|
||||
'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
|
||||
'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
|
||||
'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'],
|
||||
'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
|
||||
'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'],
|
||||
'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
|
||||
}
|
||||
|
||||
func_base_name = func_name.split('.')[-1]
|
||||
|
||||
param_names = function_parameters.get(func_base_name, [])
|
||||
|
||||
args = {}
|
||||
for idx, arg in enumerate(parsed_args):
|
||||
if idx < len(param_names):
|
||||
param_name = param_names[idx]
|
||||
arg_value = ast.literal_eval(arg)
|
||||
args[param_name] = arg_value
|
||||
|
||||
try:
|
||||
for kw in parsed_keywords:
|
||||
param_name = kw.arg
|
||||
arg_value = ast.literal_eval(kw.value)
|
||||
args[param_name] = arg_value
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing keyword arguments: {e}")
|
||||
return pyautogui_code_relative_coordinates
|
||||
|
||||
updated = False
|
||||
if 'x' in args and 'y' in args:
|
||||
try:
|
||||
x_rel = float(args['x'])
|
||||
y_rel = float(args['y'])
|
||||
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
|
||||
logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
|
||||
args['x'] = x_abs
|
||||
args['y'] = y_abs
|
||||
updated = True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if 'xOffset' in args and 'yOffset' in args:
|
||||
try:
|
||||
x_rel = float(args['xOffset'])
|
||||
y_rel = float(args['yOffset'])
|
||||
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
|
||||
args['xOffset'] = x_abs
|
||||
args['yOffset'] = y_abs
|
||||
updated = True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if updated:
|
||||
reconstructed_args = []
|
||||
for idx, param_name in enumerate(param_names):
|
||||
if param_name in args:
|
||||
arg_value = args[param_name]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"'{arg_value}'"
|
||||
else:
|
||||
arg_repr = str(arg_value)
|
||||
reconstructed_args.append(arg_repr)
|
||||
else:
|
||||
break
|
||||
|
||||
used_params = set(param_names[:len(reconstructed_args)])
|
||||
for kw in parsed_keywords:
|
||||
if kw.arg not in used_params:
|
||||
arg_value = args[kw.arg]
|
||||
if isinstance(arg_value, str):
|
||||
arg_repr = f"{kw.arg}='{arg_value}'"
|
||||
else:
|
||||
arg_repr = f"{kw.arg}={arg_value}"
|
||||
reconstructed_args.append(arg_repr)
|
||||
|
||||
new_args_str = ', '.join(reconstructed_args)
|
||||
new_full_call = f"{func_name}({new_args_str})"
|
||||
new_code = new_code.replace(full_call, new_full_call)
|
||||
|
||||
return new_code
|
||||
|
||||
def extract_positions_and_instructions(code, action) -> list[dict]:
|
||||
"""
|
||||
Extracts all `(x, y)` coordinates (both positional and keyword arguments)
|
||||
and their associated preceding comments as instructions from Python code.
|
||||
If there are no comments, use the corresponding action instead.
|
||||
|
||||
Args:
|
||||
code (str): The Python code as a string.
|
||||
action (str): The low-level action as a string.
|
||||
|
||||
Returns:
|
||||
list[dict]: A list of dictionaries with extracted positions and instructions.
|
||||
- function (str): The pyautogui function name.
|
||||
- x (int or float): The x-coordinate.
|
||||
- y (int or float): The y-coordinate.
|
||||
- instruction (str): The preceding comment as an instruction.
|
||||
"""
|
||||
lines = code.splitlines()
|
||||
extracted = []
|
||||
preceding_comment = action # To store the preceding comment
|
||||
|
||||
for line in lines:
|
||||
preceding_comment = action
|
||||
# Check if the line is a comment and store it
|
||||
if line.strip().startswith("#"):
|
||||
preceding_comment = line.strip().lstrip("#").strip() # Clean the comment
|
||||
|
||||
# Match pyautogui functions with positional arguments
|
||||
match_positional = re.match(r"(pyautogui\.\w+)\((\d+(\.\d+)?),\s*(\d+(\.\d+)?).*?\)", line)
|
||||
if match_positional:
|
||||
extracted.append({
|
||||
"function": match_positional.group(1), # pyautogui function name
|
||||
"x": float(match_positional.group(2)) if '.' in match_positional.group(2)\
|
||||
else int(match_positional.group(2)), # x-coordinate
|
||||
"y": float(match_positional.group(4)) if '.' in match_positional.group(4)\
|
||||
else int(match_positional.group(3)), # y-coordinate
|
||||
"instruction": preceding_comment, # Use the preceding comment
|
||||
})
|
||||
preceding_comment = None # Reset after associating it with a line
|
||||
continue
|
||||
|
||||
# Match pyautogui functions with keyword arguments
|
||||
match_keyword = re.match(r"(pyautogui\.\w+)\(.*?x=(\d+(\.\d+)?),\s*y=(\d+(\.\d+)?).*?\)", line)
|
||||
if match_keyword:
|
||||
extracted.append({
|
||||
"function": match_keyword.group(1), # pyautogui function name
|
||||
"x": float(match_keyword.group(2)) if '.' in match_keyword.group(2)\
|
||||
else int(match_keyword.group(2)), # x-coordinate
|
||||
"y": float(match_keyword.group(4)) if '.' in match_keyword.group(4)\
|
||||
else int(match_keyword.group(3)), # y-coordinate
|
||||
"instruction": preceding_comment, # Use the preceding comment
|
||||
})
|
||||
preceding_comment = None # Reset after associating it with a line
|
||||
|
||||
logger.info(f"Grounding extracted:\n{extracted}")
|
||||
return extracted
|
||||
|
||||
def update_code_with_new_coordinates(code, updated_positions):
|
||||
"""
|
||||
Replaces old `(x, y)` coordinates (both positional and keyword arguments)
|
||||
with updated ones in the code, handling multiple occurrences correctly.
|
||||
|
||||
Args:
|
||||
code (str): The original Python code as a string.
|
||||
updated_positions (list): A list of dictionaries with updated positions.
|
||||
|
||||
Returns:
|
||||
str: The updated Python code.
|
||||
"""
|
||||
|
||||
lines = code.splitlines()
|
||||
updated_code_lines = []
|
||||
position_index = 0 # Tracks which position update to use
|
||||
|
||||
for line in lines:
|
||||
if position_index < len(updated_positions):
|
||||
# Get the next update position
|
||||
update = updated_positions[position_index]
|
||||
function_pattern_positional = rf"{update['function']}\(\d+(\.\d+)?, \d+(\.\d+)?"
|
||||
function_pattern_keyword = rf"{update['function']}\(.*?x=\d+(\.\d+)?, y=\d+(\.\d+)?"
|
||||
|
||||
if re.search(function_pattern_positional, line):
|
||||
# Replace positional arguments
|
||||
line = re.sub(
|
||||
function_pattern_positional,
|
||||
f"{update['function']}({update['x']}, {update['y']}",
|
||||
line,
|
||||
count=1
|
||||
)
|
||||
position_index += 1 # Move to the next update
|
||||
elif re.search(function_pattern_keyword, line):
|
||||
# Replace keyword arguments
|
||||
line = re.sub(
|
||||
function_pattern_keyword,
|
||||
f"{update['function']}(x={update['x']}, y={update['y']}",
|
||||
line,
|
||||
count=1
|
||||
)
|
||||
position_index += 1 # Move to the next update
|
||||
|
||||
updated_code_lines.append(line)
|
||||
|
||||
return "\n".join(updated_code_lines)
|
||||
|
||||
def transform_agnet_action_to_code_block(action):
|
||||
"""Transform the agent action to a code block: not used in agent, for logging only"""
|
||||
if "computer.terminate" in action or "browser.select_option" in action or "browser.clear" in action:
|
||||
return f"```code\n{action}\n```"
|
||||
else:
|
||||
return f"```python\n{action}\n```"
|
||||
|
||||
class OpenCUAAgent:
|
||||
"""
|
||||
OpenCUA Agent for desktop automation tasks.
|
||||
|
||||
This class implements a OpenCUA Model based agent that can observe
|
||||
desktop environments through screenshots and execute mouse/keyboard actions
|
||||
via PyAutoGUI to complete automation tasks.
|
||||
|
||||
Attributes:
|
||||
model (str): Name of the language model being used
|
||||
history_type (str): Type of history recording mechanism
|
||||
actions (list): History of executed actions
|
||||
observations (list): History of environment observations
|
||||
cots (list): Chain of thought reasoning records
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
model: str, # OpenCUA model name
|
||||
history_type: str, # History step type: action_history, thought_history, observation_history
|
||||
max_image_history_length: int = 3, # The max number of images in the history
|
||||
platform: str = "ubuntu", # The platform of the computer
|
||||
max_tokens: int = 1500, # The max number of tokens in the response
|
||||
top_p: float = 0.9, # The top p value in the response
|
||||
temperature: float = 0, # The temperature value in the response
|
||||
action_space: str = "pyautogui", # The action space: pyautogui
|
||||
observation_type: str = "screenshot", # The observation type: screenshot
|
||||
cot_level: str = "l2", # The CoT level: l1, l2, l3
|
||||
screen_size: Tuple[int, int] = (1920, 1080), # The screen size
|
||||
coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25
|
||||
**kwargs
|
||||
):
|
||||
assert coordinate_type in ["relative", "absolute", "qwen25"]
|
||||
assert action_space in ["pyautogui"], "Invalid action space"
|
||||
assert observation_type in ["screenshot"], "Invalid observation type"
|
||||
assert history_type in ["action_history", "thought_history", "observation_history"]
|
||||
assert model is not None, "Model cannot be None"
|
||||
|
||||
self.model = model
|
||||
self.platform = platform
|
||||
self.max_tokens = max_tokens
|
||||
self.top_p = top_p
|
||||
self.temperature = temperature
|
||||
self.action_space = action_space
|
||||
self.observation_type = observation_type
|
||||
self.history_type = history_type
|
||||
self.coordinate_type = coordinate_type
|
||||
self.cot_level = cot_level
|
||||
self.screen_size = screen_size
|
||||
self.max_image_history_length = max_image_history_length
|
||||
|
||||
if history_type == "action_history":
|
||||
self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE
|
||||
elif history_type == "thought_history":
|
||||
self.HISTORY_TEMPLATE = THOUGHT_HISTORY_TEMPLATE
|
||||
elif history_type == "observation_history":
|
||||
self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE
|
||||
else:
|
||||
raise ValueError(f"Invalid history type: {history_type}")
|
||||
|
||||
if cot_level == "l3":
|
||||
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L3
|
||||
elif cot_level == "l2":
|
||||
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L2
|
||||
elif cot_level == "l1":
|
||||
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L1
|
||||
else:
|
||||
raise ValueError(f"Invalid COT level: {cot_level}")
|
||||
|
||||
self.actions = []
|
||||
self.observations = []
|
||||
self.cots = []
|
||||
|
||||
def reset(self, _logger=None):
|
||||
global logger
|
||||
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
|
||||
|
||||
self.observations = []
|
||||
self.cots = []
|
||||
self.actions = []
|
||||
|
||||
def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str:
|
||||
""" pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system"""
|
||||
if self.platform.lower() != "windows":
|
||||
return code
|
||||
|
||||
pattern_pos = re.compile(r'(pyautogui\.scroll\()\s*([-+]?\d+)\s*\)')
|
||||
code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code)
|
||||
return code
|
||||
|
||||
def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]:
|
||||
"""
|
||||
Predict the next action(s) based on the current observation.
|
||||
"""
|
||||
if "step_idx" in kwargs:
|
||||
logger.info(f"========= {self.model} Step {kwargs['step_idx']} =======")
|
||||
else:
|
||||
logger.info(f"========================== {self.model} ===================================")
|
||||
logger.info(f"Instruction: \n{instruction}")
|
||||
|
||||
messages = []
|
||||
messages.append({
|
||||
"role": "system",
|
||||
"content": self.SYSTEM_PROMPT
|
||||
})
|
||||
|
||||
history_step_texts = []
|
||||
for i in range(len(self.actions)):
|
||||
if i > len(self.actions) - self.max_image_history_length:
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}"}
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
|
||||
observation=self.cots[i].get('observation'),
|
||||
thought=self.cots[i].get('thought'),
|
||||
action=self.cots[i].get('action')
|
||||
)
|
||||
|
||||
messages.append({
|
||||
"role": "assistant",
|
||||
"content": history_content
|
||||
})
|
||||
else:
|
||||
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
|
||||
observation=self.cots[i].get('observation'),
|
||||
thought=self.cots[i].get('thought'),
|
||||
action=self.cots[i].get('action')
|
||||
)
|
||||
history_step_texts.append(history_content)
|
||||
if i == len(self.actions) - self.max_image_history_length:
|
||||
messages.append({
|
||||
"role":"assistant",
|
||||
"content": "\n".join(history_step_texts)
|
||||
})
|
||||
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"text": INSTRUTION_TEMPLATE.format(instruction=instruction)
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
response = self.call_llm({
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
"max_tokens": self.max_tokens,
|
||||
"top_p": self.top_p,
|
||||
"temperature": self.temperature
|
||||
}, self.model)
|
||||
|
||||
logger.info(f"Model Output: \n{response}")
|
||||
if not response:
|
||||
logger.error("No response found in the response.")
|
||||
return "ERROR", ["DONE"], {}
|
||||
|
||||
low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type)
|
||||
if not pyautogui_actions or len(pyautogui_actions) == 0:
|
||||
logger.error("No pyautogui actions found in the response.")
|
||||
return response, ["FAIL"], {}
|
||||
|
||||
pyautogui_actions = [
|
||||
self._scale_scroll_for_windows(code) for code in pyautogui_actions
|
||||
]
|
||||
|
||||
self.observations.append(obs)
|
||||
logger.info(f"Parsed Low-level Action: \n{low_level_instruction}")
|
||||
logger.info(f"Parsed pyautogui Action: \n{pyautogui_actions}")
|
||||
|
||||
self.actions.append(low_level_instruction)
|
||||
if 'action' not in other_cot or not other_cot['action'] or 'thought' not in other_cot or not other_cot['thought']:
|
||||
logger.error("Error! no action/thought in cot")
|
||||
logger.error(f"response: {response}")
|
||||
logger.error(f"cot: {other_cot}")
|
||||
self.cots.append(other_cot)
|
||||
|
||||
# Print message structure if needed
|
||||
# messages_to_print = []
|
||||
# current_image = 1
|
||||
# for msg in messages:
|
||||
# msg_copy = copy.deepcopy(msg)
|
||||
# if isinstance(msg_copy['content'], list):
|
||||
# for content in msg_copy['content']:
|
||||
# if content['type'] == 'image_url':
|
||||
# content['image_url']['url'] = f'Image {current_image}'
|
||||
# current_image += 1
|
||||
# messages_to_print.append(msg_copy)
|
||||
|
||||
# messages_to_print.append({
|
||||
# "new_step_cot": other_cot,
|
||||
# "response": response
|
||||
# })
|
||||
# logger.info(json.dumps(messages_to_print, indent=2))
|
||||
logger.info(f"New step cot: {other_cot}")
|
||||
|
||||
return response, pyautogui_actions, {}
|
||||
|
||||
|
||||
@backoff.on_exception(
|
||||
backoff.constant,
|
||||
# here you should add more model exceptions as you want,
|
||||
# but you are forbidden to add "Exception", that is, a common type of exception
|
||||
# because we want to catch this kind of Exception in the outside to ensure
|
||||
# each example won't exceed the time limit
|
||||
(
|
||||
Exception
|
||||
),
|
||||
interval=30,
|
||||
max_tries=10
|
||||
)
|
||||
def call_llm(self, payload, model):
|
||||
"""Call the LLM API"""
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}"
|
||||
}
|
||||
|
||||
for _ in range(30):
|
||||
response = httpx.post(
|
||||
os.environ['OPENCUA_URL'],
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=500,
|
||||
verify=False
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
logger.error("Failed to call LLM: " + response.text)
|
||||
logger.error("Retrying...")
|
||||
time.sleep(5)
|
||||
else:
|
||||
response = response.json()
|
||||
finish_reason = response["choices"][0].get("finish_reason")
|
||||
if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens
|
||||
return response['choices'][0]['message']['content']
|
||||
else:
|
||||
logger.error("LLM did not finish properly, retrying...")
|
||||
time.sleep(5)
|
||||
|
|
@ -3,29 +3,34 @@
|
|||
|
||||
You should first host the OpenCUA model on your local machine or a server.
|
||||
|
||||
Command for OpenCUA-72B:
|
||||
```
|
||||
python run_multienv_opencua.py \
|
||||
--headless \
|
||||
--observation_type screenshot \
|
||||
--model OpenCUA-72B \
|
||||
--result_dir ./results\
|
||||
--test_all_meta_path evaluation_examples/test_nogdrive.json \
|
||||
--max_steps 100 \
|
||||
--num_envs 30 \
|
||||
--coordinate_type qwen25
|
||||
|
||||
```
|
||||
|
||||
|
||||
Command for OpenCUA-7B and OpenCUA-32B:
|
||||
```
|
||||
python run_multienv_opencua.py \
|
||||
--headless \
|
||||
--observation_type screenshot \
|
||||
--model OpenCUA-32B \
|
||||
--result_dir ./results --test_all_meta_path evaluation_examples/test_all_no_gdrive.json \
|
||||
--max_steps 100 \
|
||||
--num_envs 30 \
|
||||
--coordinate_type qwen25
|
||||
```
|
||||
|
||||
Command for OpenCUA-Qwen2-7B and OpenCUA-A3B:
|
||||
```
|
||||
python run_multienv_opencua.py \
|
||||
--headless \
|
||||
--observation_type screenshot \
|
||||
--model OpenCUA-A3B \
|
||||
--result_dir ./results \
|
||||
--result_dir ./results\
|
||||
--test_all_meta_path evaluation_examples/test_nogdrive.json \
|
||||
--max_steps 100 \
|
||||
--num_envs 10 \
|
||||
--coordinate_type relative
|
||||
--num_envs 30 \
|
||||
--coordinate_type qwen25 \
|
||||
--use_old_sys_prompt
|
||||
|
||||
```
|
||||
|
||||
"""
|
||||
|
|
@ -44,7 +49,7 @@ from multiprocessing import Process, Manager
|
|||
from multiprocessing import current_process
|
||||
import lib_run_single
|
||||
from desktop_env.desktop_env import DesktopEnv
|
||||
from mm_agents.opencua_agent import OpenCUAAgent
|
||||
from mm_agents.opencua import OpenCUAAgent
|
||||
|
||||
# Global variables for signal handling
|
||||
active_environments = []
|
||||
|
|
@ -76,8 +81,8 @@ def config() -> argparse.Namespace:
|
|||
default="screenshot",
|
||||
help="Observation type",
|
||||
)
|
||||
parser.add_argument("--sleep_after_execution", type=float, default=3.0)
|
||||
parser.add_argument("--max_steps", type=int, default=15)
|
||||
parser.add_argument("--sleep_after_execution", type=float, default=5.0)
|
||||
parser.add_argument("--max_steps", type=int, default=100)
|
||||
|
||||
# evaluation config
|
||||
parser.add_argument(
|
||||
|
|
@ -85,7 +90,7 @@ def config() -> argparse.Namespace:
|
|||
)
|
||||
|
||||
# lm config
|
||||
parser.add_argument("--model", type=str, default="opencua")
|
||||
parser.add_argument("--model", type=str, default=None)
|
||||
parser.add_argument("--temperature", type=float, default=0)
|
||||
parser.add_argument("--top_p", type=float, default=0.9)
|
||||
parser.add_argument("--max_tokens", type=int, default=2048)
|
||||
|
|
@ -94,13 +99,14 @@ def config() -> argparse.Namespace:
|
|||
# OpenCUAagent config
|
||||
parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l1, l2, l3. Default is l2 includes 'thought' and 'action'")
|
||||
parser.add_argument("--history_type", type=str, default="action_history", help="Use action to represent history steps", choices=["action_history", "thought_history", "observation_history"])
|
||||
parser.add_argument("--coordinate_type", type=str, default="relative", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"])
|
||||
parser.add_argument("--coordinate_type", type=str, default="qwen25", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"])
|
||||
parser.add_argument("--max_image_history_length", type=int, default=3, help="The max number of images in the history.")
|
||||
|
||||
parser.add_argument("--use_old_sys_prompt", action="store_true", help="Use the old system prompt for OpenCUA-7B and OpenCUA-32B")
|
||||
|
||||
# example config
|
||||
parser.add_argument("--domain", type=str, default="all")
|
||||
parser.add_argument(
|
||||
"--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
|
||||
"--test_all_meta_path", type=str, default="evaluation_examples/test_nogdrive.json"
|
||||
)
|
||||
|
||||
# logging related
|
||||
|
|
@ -124,6 +130,9 @@ def config() -> argparse.Namespace:
|
|||
parser.add_argument(
|
||||
"--screen_height", type=int, default=1080, help="Screen height"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--password", type=str, default="osworld-public-evaluation", help="The password for the computer if needed"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
|
@ -253,6 +262,9 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
|
|||
screen_size=(args.screen_width, args.screen_height),
|
||||
coordinate_type=args.coordinate_type,
|
||||
max_image_history_length=args.max_image_history_length,
|
||||
max_steps=args.max_steps,
|
||||
use_old_sys_prompt=args.use_old_sys_prompt,
|
||||
password=args.password,
|
||||
)
|
||||
try:
|
||||
lib_run_single.run_single_example_opencua(
|
||||
|
|
|
|||
Loading…
Reference in New Issue