Compare commits

...

13 Commits

Author SHA1 Message Date
XinyuanWangCS 29c423de56 change provider ip 2025-10-13 02:36:33 +00:00
XinyuanWangCS 63cb699fc0 update opencua72b agent 2025-10-13 02:30:40 +00:00
XinyuanWangCS b41640285e Merge remote-tracking branch 'origin' into opencua-72b 2025-10-13 02:25:10 +00:00
XinyuanWangCS 2ce1ab31b5 update 2025-10-11 15:11:23 +00:00
XinyuanWangCS a5b15885ab update 2025-09-29 15:47:40 +00:00
XinyuanWangCS cdd7640044 update password 2025-09-27 14:03:05 +00:00
XinyuanWangCS c36c2e36d9 OpenCUA-72B 2025-09-27 03:53:27 +00:00
XinyuanWangCS c487414018 merge main 2025-09-26 17:27:05 +00:00
Jiaqi 4babaf30b3 Merge branch 'main' into jq/dev 2025-07-27 05:21:49 +00:00
Jiaqi f998aca0b5 merge main into jq/dev 2025-07-26 15:27:43 +00:00
Jiaqi 826c0ef945 Merge branch 'main' into jq/dev 2025-07-24 08:13:07 +00:00
Jiaqi 80b80617c4 os task fix: set the default dim screen time to be 300s 2025-07-24 08:12:45 +00:00
Jiaqi 0f1ef6d9b7 use aws pub ip 2025-07-24 06:04:01 +00:00
7 changed files with 1345 additions and 761 deletions

View File

@ -55,6 +55,7 @@ def run_single_example(agent, env, example, max_steps, instruction, args, exampl
logger.info("The episode is done.")
break
step_idx += 1
time.sleep(20) # Wait for the environment to settle
result = env.evaluate()
logger.info("Result: %.2f", result)
scores.append(result)
@ -186,23 +187,25 @@ def run_single_example_opencua(agent, env, example, max_steps, instruction, args
"wb") as _f:
_f.write(obs['screenshot'])
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
with open(os.path.join(example_result_dir, "traj.jsonl"), "a", encoding="utf-8") as f:
f.write(json.dumps({
"step_num": step_idx + 1,
"action_timestamp": action_timestamp,
"action": action,
"natural_language_action": info_dict.get("action"),
"action_timestamp": action_timestamp,
"response": response,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_idx + 1}_{action_timestamp}.png"
}))
}, ensure_ascii=False))
f.write("\n")
if done:
logger.info("The episode is done.")
break
step_idx += 1
time.sleep(20) # Wait for the environment to settle
result = env.evaluate()
logger.info("Result: %.2f", result)
scores.append(result)

View File

@ -0,0 +1,3 @@
from mm_agents.opencua.opencua_agent import OpenCUAAgent
__all__ = ["OpenCUAAgent"]

View File

@ -0,0 +1,470 @@
"""
OpenCUA Agent Implementation
This module implements an OpenCUA agent for desktop automation tasks, building upon
existing frameworks and integrating multiple coordinate mapping systems.
Framework and Implementation Sources:
- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py
- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py
- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
"""
import re
import os
import ast
import time
import math
import httpx
import base64
import backoff
import traceback
from loguru import logger
from typing import Dict, List, Tuple, Optional
from mm_agents.opencua.utils import (
encode_image,
smart_resize,
)
from mm_agents.opencua.prompts import (
INSTRUTION_TEMPLATE,
STEP_TEMPLATE,
ACTION_HISTORY_TEMPLATE,
THOUGHT_HISTORY_TEMPLATE,
OBSERVATION_HISTORY_TEMPLATE,
# OpenCUA-7B, 32B system prompts
SYSTEM_PROMPT_V1_L1,
SYSTEM_PROMPT_V1_L2,
SYSTEM_PROMPT_V1_L3,
# OpenCUA-72B system prompts
build_sys_prompt,
)
def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]:
"""Parse response including Observation, Thought, Action and code block"""
sections = {}
try:
obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if obs_match:
sections['observation'] = obs_match.group(1).strip()
thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if thought_match:
sections['thought'] = thought_match.group(1).strip()
action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if action_match:
action = action_match.group(1).strip()
sections['action'] = action.strip()
code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
if not code_blocks:
logger.error("No code blocks found in the input string")
return f"<Error>: no code blocks found in the input string: {input_string}", ["FAIL"], sections
code_block = code_blocks[-1].strip()
sections['original_code'] = code_block
if "computer.wait" in code_block.lower():
sections["code"] = "WAIT"
return sections['action'], ["WAIT"], sections
elif "computer.terminate" in code_block.lower():
lower_block = code_block.lower()
if ("failure" in lower_block) or ("fail" in lower_block):
sections['code'] = "FAIL"
return code_block, ["FAIL"], sections
elif "success" in lower_block:
sections['code'] = "DONE"
return code_block, ["DONE"], sections
else:
logger.error("Terminate action found but no specific status provided in code block")
return f"<Error>: terminate action found but no specific status provided in code block: {input_string}", ["FAIL"], sections
# corrected_code = correct_pyautogui_arguments(code_block)
corrected_code = code_block
sections['code'] = corrected_code
sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type)
if ('code' not in sections or sections['code'] is None or sections['code'] == "") or ('action' not in sections or sections['action'] is None or sections['action'] == ""):
logger.error("Missing required action or code section")
return f"<Error>: no code parsed: {input_string}", ["FAIL"], sections
return sections['action'], [sections['code']], sections
except Exception as e:
error_message = f"<Error>: parsing response: {str(e)}\nTraceback:\n{traceback.format_exc()}\nInput string: {input_string}"
logger.error(error_message)
return error_message, ['FAIL'], sections
def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"):
"""
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
"""
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
if coordinate_type == "relative":
return int(round(x * screen_width)), int(round(y * screen_height))
elif coordinate_type == "qwen25":
height, width = smart_resize(
height=screen_height,
width=screen_width,
factor=28,
min_pixels=3136,
max_pixels=12845056
)
if 0 <= x <= 1 and 0 <= y <= 1:
# If already normalized, treat like "relative"
return int(round(x * width)), int(round(y * height))
return int(x / width * screen_width), int(y / height * screen_height)
else:
raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].")
pattern = r'(pyautogui\.\w+\([^\)]*\))'
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
new_code = pyautogui_code_relative_coordinates
for full_call in matches:
func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
if not func_match:
continue
func_name = func_match.group(1)
args_str = func_match.group(2)
try:
parsed = ast.parse(f"func({args_str})").body[0].value
parsed_args = parsed.args
parsed_keywords = parsed.keywords
except SyntaxError:
return pyautogui_code_relative_coordinates
function_parameters = {
'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
'rightClick': ['x', 'y', 'duration', 'tween', 'pause'],
'middleClick': ['x', 'y', 'duration', 'tween', 'pause'],
'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
'tripleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
}
func_base_name = func_name.split('.')[-1]
param_names = function_parameters.get(func_base_name, [])
args = {}
for idx, arg in enumerate(parsed_args):
if idx < len(param_names):
param_name = param_names[idx]
arg_value = ast.literal_eval(arg)
args[param_name] = arg_value
try:
for kw in parsed_keywords:
param_name = kw.arg
arg_value = ast.literal_eval(kw.value)
args[param_name] = arg_value
except Exception as e:
logger.error(f"Error parsing keyword arguments: {e}")
return pyautogui_code_relative_coordinates
updated = False
if 'x' in args and 'y' in args:
try:
x_rel = float(args['x'])
y_rel = float(args['y'])
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
args['x'] = x_abs
args['y'] = y_abs
updated = True
except ValueError:
pass
if updated:
reconstructed_args = []
for idx, param_name in enumerate(param_names):
if param_name in args:
arg_value = args[param_name]
if isinstance(arg_value, str):
arg_repr = f"'{arg_value}'"
else:
arg_repr = str(arg_value)
reconstructed_args.append(arg_repr)
else:
break
used_params = set(param_names[:len(reconstructed_args)])
for kw in parsed_keywords:
if kw.arg not in used_params:
arg_value = args[kw.arg]
if isinstance(arg_value, str):
arg_repr = f"{kw.arg}='{arg_value}'"
else:
arg_repr = f"{kw.arg}={arg_value}"
reconstructed_args.append(arg_repr)
new_args_str = ', '.join(reconstructed_args)
new_full_call = f"{func_name}({new_args_str})"
new_code = new_code.replace(full_call, new_full_call)
return new_code
def transform_agnet_action_to_code_block(action):
if any(keyword in action for keyword in ["computer.terminate", "computer.wait", "browser.select_option", "browser.clear"]):
return f"```code\n{action}\n```"
else:
return f"```python\n{action}\n```"
class OpenCUAAgent:
"""
OpenCUA Agent for desktop automation tasks.
This class implements a OpenCUA Model based agent that can observe
desktop environments through screenshots and execute mouse/keyboard actions
via PyAutoGUI to complete automation tasks.
Attributes:
model (str): Name of the language model being used
history_type (str): Type of history recording mechanism
actions (list): History of executed actions
observations (list): History of environment observations
cots (list): Chain of thought reasoning records
"""
def __init__(
self,
model: str, # OpenCUA model name
history_type: str, # History step type: action_history, thought_history, observation_history
max_steps: int, # The max number of steps to finish the task
max_image_history_length: int = 3, # The max number of images in the history
platform: str = "ubuntu", # The platform of the computer
max_tokens: int = 1500, # The max number of tokens in the response
top_p: float = 0.9, # The top p value in the response
temperature: float = 0, # The temperature value in the response
action_space: str = "pyautogui", # The action space: pyautogui
observation_type: str = "screenshot", # The observation type: screenshot
cot_level: str = "l2", # The CoT level: l1, l2, l3
screen_size: Tuple[int, int] = (1920, 1080), # The screen size
coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25
use_old_sys_prompt: bool = False, # Whether to use the old system prompt
password="osworld-public-evaluation", # The password for the ubuntu platform
**kwargs
):
assert coordinate_type in ["relative", "absolute", "qwen25"]
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
assert history_type in ["action_history", "thought_history", "observation_history"]
assert model is not None, "Model cannot be None"
self.model = model
self.platform = platform
self.max_tokens = max_tokens
self.top_p = top_p
self.temperature = temperature
self.action_space = action_space
self.observation_type = observation_type
self.history_type = history_type
self.coordinate_type = coordinate_type
self.cot_level = cot_level
self.screen_size = screen_size
self.max_image_history_length = max_image_history_length
self.max_steps = max_steps
self.password = password
if history_type == "action_history":
self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE
elif history_type == "thought_history":
self.HISTORY_TEMPLATE = THOUGHT_HISTORY_TEMPLATE
elif history_type == "observation_history":
self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE
else:
raise ValueError(f"Invalid history type: {history_type}")
if use_old_sys_prompt:
if cot_level == "l1":
self.system_prompt = SYSTEM_PROMPT_V1_L1
elif cot_level == "l2":
self.system_prompt = SYSTEM_PROMPT_V1_L2
elif cot_level == "l3":
self.system_prompt = SYSTEM_PROMPT_V1_L3
else:
raise ValueError("Invalid cot_level. Choose from 'l1', 'l2', or 'l3'.")
else:
self.system_prompt = build_sys_prompt(
level=self.cot_level,
password=self.password,
use_random=False
)
self.actions = []
self.observations = []
self.cots = []
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
self.observations = []
self.cots = []
self.actions = []
def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str:
""" pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system"""
if self.platform.lower() != "windows":
return code
pattern_pos = re.compile(r'(pyautogui\.scroll\()\s*([-+]?\d+)\s*\)')
code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code)
return code
def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]:
"""
Predict the next action(s) based on the current observation.
"""
if "step_idx" in kwargs:
logger.info(f"========= {self.model} Step {kwargs['step_idx']} =======")
else:
logger.info(f"========================== {self.model} ===================================")
logger.info(f"Instruction: \n{instruction}")
messages = []
messages.append({
"role": "system",
"content": self.system_prompt
})
instruction_prompt = INSTRUTION_TEMPLATE.format(instruction=instruction)
history_step_texts = []
for i in range(len(self.actions)):
if i > len(self.actions) - self.max_image_history_length:
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}"}
}
]
})
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i].get('action')
)
messages.append({
"role": "assistant",
"content": history_content
})
else:
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i].get('action')
)
history_step_texts.append(history_content)
if i == len(self.actions) - self.max_image_history_length:
messages.append({
"role":"assistant",
"content": "\n".join(history_step_texts)
})
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
},
{
"type": "text",
"text": instruction_prompt
}
]
})
max_retry = 5
retry_count = 0
low_level_instruction = None
pyautogui_actions = None
other_cot = {}
while retry_count < max_retry:
try:
response = self.call_llm({
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature if retry_count==0 else max(0.2, self.temperature)
}, self.model)
logger.info(f"Model Output: \n{response}")
if not response:
logger.error("No response found in the response.")
raise ValueError(f"No response found in the response:\n{response}.")
low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type)
if "<Error>" in low_level_instruction or not pyautogui_actions:
logger.error(f"Error parsing response: {low_level_instruction}")
raise ValueError(f"Error parsing response: {low_level_instruction}")
break
except Exception as e:
logger.error(f"Error during message preparation: {e}")
retry_count += 1
if retry_count == max_retry:
logger.error("Maximum retries reached. Exiting.")
return str(e), ['FAIL'], other_cot
pyautogui_actions = [
self._scale_scroll_for_windows(code) for code in pyautogui_actions
]
logger.info(f"Action: \n{low_level_instruction}")
logger.info(f"Code: \n{pyautogui_actions}")
self.observations.append(obs)
self.actions.append(low_level_instruction)
self.cots.append(other_cot)
current_step = len(self.actions)
if current_step >= self.max_steps and 'computer.terminate' not in pyautogui_actions[0].lower():
logger.warning(f"Reached maximum steps {self.max_steps}. Forcing termination.")
low_level_instruction = 'Fail the task because reaching the maximum step limit.'
pyautogui_actions = ['FAIL']
other_cot['code'] = 'FAIL'
return response, pyautogui_actions, other_cot
def call_llm(self, payload, model):
"""Call the LLM API"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}"
}
for _ in range(20):
response = httpx.post(
f"https://{self.model}.app.msh.team/v1/chat/completions",
headers=headers,
json=payload,
timeout=500,
verify=False
)
if response.status_code != 200:
logger.error("Failed to call LLM: " + response.text)
logger.error("Retrying...")
time.sleep(5)
else:
response = response.json()
finish_reason = response["choices"][0].get("finish_reason")
if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens
return response['choices'][0]['message']['content']
else:
logger.error("LLM did not finish properly, retrying...")
time.sleep(5)

View File

@ -0,0 +1,349 @@
import random
# System prompt for OpenCUA-7B, OpenCUA-32B
# System prompts used in the training data
SYSTEM_PROMPT_V1_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
SYSTEM_PROMPT_V1_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
SYSTEM_PROMPT_V1_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip()
# Testing prompt on OSWorld-Verified
SYSTEM_PROMPT_V1_L2 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. The password of the computer is "osworld-public-evaluation". If the task is not possible to do, output the action computer.terminate(status='failure').
For each step, provide your response in this format:
Thought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning
Action:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize "", maximize "", close "X")\n - if the action involves keyboard actions like \'press\', \'write\', \'hotkey\':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions
Finally, output the action as PyAutoGUI code or the following functions:
- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}}
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}}
""".strip()
# SYSTEM_PROMPT for OpenCUA-72B
general_computer_instructions = [
"""
You are a GUI agent. You are given a task, a screenshot of the screen and your previous interactions with the computer. You need to perform a series of actions to complete the task. The password of the computer is "{password}", use it when you need sudo rights. You need to **wait** explicitly for installation, waiting website loading or running commands to finish. Don\'t terminate the task unless you are sure the task is finished. If you find that you can\'t finish the task, or the task is not finished exactly as the instruction indicates (you have made progress but not finished the task completely), or the task is impossible to complete, you must report **failure**.
""".strip(),
"""
You are acting as a GUI agent. A task description, a screenshot, and your past interactions will be supplied. Execute the necessary steps to fulfil the task. Whenever sudo operations are required, use the computers password "{password}". Insert an explicit **wait** after launching any installation, waiting website loading or long-running command to let it finish. Do not output terminate action unless you are certain the task is complete. If you realise the task can be finished or impossible to do, you should report **failure**.
""".strip(),
"""
Your mission as a GUI agent is to complete the provided task using the current screen image and the history of interactions. For commands requiring elevated privileges, supply "{password}" as the sudo password. Explicitly invoke **wait** after launching any installation or command that may take time to finish. Do not terminate the session unless success is certain. If the task cannot be fully executed, or turns out impossible, you must declare **failure**.
""".strip(),
]
l3_format_instruction = """For each step, provide your response in this format:
# Step: {step number}
## Observation:
{observation}
## Thought:
{thought}
## Action:
{action}
## Code:
{code}"""
l2_format_instruction = """For each step, provide your response in this format:
# Step: {step number}
## Thought:
{thought}
## Action:
{action}
## Code:
{code}"""
l1_format_instruction = """For each step, provide your response in this format:
# Step: {step number}
## Action:
{action}
## Code:
{code}"""
observation_instructions = [
"""For the Observation section, you should include the following parts if helpful:
- Describe the current computer state based on the full screenshot in detail.
- Application Context:
- The active application
- The active window or page
- Overall layout and visible interface
- Key Elements:
- Menu items and toolbars
- Buttons and controls
- Text fields and content
- Dialog boxes or popups
- Error messages or notifications
- Loading states
- Other key elements
- Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).
""".strip(),
"""In the Observation section, outline everything visible on screen that could influence your next move:
Current system state as seen in the screenshot.
Application context:
- Which application is running in the foreground
- Specific window, tab, or page being displayed
- High-level layout of panels, sidebars, and work areas
Salient interface elements:
- Menus, ribbons, and toolbars
- Actionable buttons, icons, toggles, and controls
- Input areas such as text boxes or code editors
- Pop-up dialogs, modals, alerts, or system notifications
- Progress bars, spinners, or other loading indicators
Any text, labels, shapes, or on-screen cues that might help accomplish the task (cite names or visual traits when available).
""".strip(),
# ── Variant 3 ──────────────────────────────────────────────────────────
"""Write the Observation section as a thorough snapshot of the UI:
- Start with a full-screen description: what the user sees at a glance.
- Give application details: title, active workspace, and structural layout.
- Enumerate critical elements:
* Navigation menus and context bars
* Primary and secondary buttons or icons
* Editable fields, lists, tables, or rich-text areas
* Dialogs, pop-ups, warnings, or confirmations
* Indicators of loading or processing activity
- Note any evidence, hints, or data (textual or visual) that could guide the task toward completion, referencing names, colors, shapes, or positions when explicit identifiers are missing.
""".strip(),
]
thought_instructions = [
"""For the Thought section, you should include the following parts:
- Reflection on the task when there is previous action:
- Consider the correnctness of previous action and its outcomes
- If the previous action was correct, describe the change in the state of the computer and reason
- If the previous action was incorrect, reflect on what went wrong and why
- Step by Step Progress Assessment:
- Add necessary information according to the history screenshots, former actions and current screenshot.
- Analyze what parts of the task have already been completed and how they contribute to the overall goal.
- Make a plan on how to complete the task based on the history and currect screenshot.
- Next Action Prediction:
- Propose the most possible next action and state the reason
- For Text Input Actions:
- Note current cursor position
- Consolidate repetitive actions (specify count for multiple keypresses)
- Describe expected final text outcome
- Use first-person perspective in reasoning
""".strip(),
"""
In the **Thought** block, cover these topics:
1. **Last-Step Reflection** (when a prior action exists)
Was my previous action correct? What evidence shows this?
If it succeeded, what state change occurred and why?
If it failed, where did I go wrong?
2. **Incremental Progress Audit**
Which sub-tasks are completed and how do they advance the mission?
Make a plan to finish the task based on past actions and the current UI state.
3. **Foresight for the Coming Action**
Predict the most logical next step.
State the reason why it is the best choice given the current context.
4. **Guidance for Text Entry**
Note the cursor location
Compress multiple identical keystrokes (e.g., press Backspace ×3)
Clarify the exact text expected after input
Use first-person inner dialogue throughout.
""".strip(),
"""
Compose your **Thought** section as an internal monologue that includes:
- **Retrospective** (if a prior step exists):
* Evaluate the accuracy and effect of the last action.
* If it was successful, reason about the resulting interface change.
* If it was faulty, diagnose the misstep and its cause.
- **Ongoing Progress Evaluation**:
* Outline which parts of the task are done and their impact on the overall objective.
* Suggest a plan to complete the task based on past history and the current screen.
- **Decision Framework for the Next Move**:
* Brainstorm possible next action given the present state.
* Explain why this action is the most logical choice.
- **Special Rules for Keyboard Input**:
* Specify current cursor focus or field.
* Merge repeated keypresses into counts for brevity.
* Describe the intended final text after typing.
Maintain a first-person voice for clarity of reasoning.
""".strip(),
]
action_instructions = [
"""For the action section, you should provide clear, concise, and actionable instructions in one sentence.
- If the action involves interacting with a specific target:
- Describe target explicitly (if multiple elements share that name, you should distinguish the target) without using coordinates
- Specify element names when possible (use original language if non-English)
- Describe features (shape, color, position) if name unavailable
- If the action involves keyboard actions like 'press', 'write', 'hotkey':
- Consolidate repetitive keypresses with count
- Specify expected text outcome for typing actions
""".strip(),
"""
Write the **Action** in one short, direct sentence.
When clicking or otherwise interacting with a UI element:
- Name the element explicitly and, if multiple elements share that name, add a distinguishing detail.
- Do **not** give coordinates.
- Use the element's label (keep original language when it isn't English).
- If unnamed, describe recognisable traits (shape, colour, on-screen position).
When using the keyboard (press, type, hotkey):
- Collapse repeated key presses into counts.
- For typing, specify the text that should appear.
""".strip(),
"""
Provide the **Action** as a single, crisp imperative sentence.
- Mouse/GUI interactions:
* Identify the target by name, and if duplicate names exist, clarify which one you mean.
* Do not supply XY coordinates.
* Preserve non-English labels verbatim.
* If unnamed, describe the element's look or location (colour, shape, relative position).
- Keyboard operations (press, write, hotkey):
* Combine repeated keystrokes with a multiplier.
* State the exact text that will be entered.
""".strip(),
]
code_instrucion = """For the code section, you should output the corresponding code for the action. The code should be either PyAutoGUI code or one of the following functions warped in the code block:
- {"name": "computer.wait", "description": "Make the computer wait for 20 seconds for installation, running code, etc.", "parameters": {"type": "object", "properties": {}, "required": []}}
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}, {"answer": {"type": "string", "description": "The answer of the task"}}, "required": ["status"]}}
Examples for the code section:
```python
pyautogui.click(x=123, y=456)
```
```code
computer.terminate(status="success")
```
```code
computer.terminate(status="success", answer='''text''')
```"""
SYSTEM_PROMPT_V2_L1 = """
{general_computer_instruction}
{format_instruction}
{action_instruction}
{code_instruction}
""".strip()
SYSTEM_PROMPT_V2_L2 = """
{general_computer_instruction}
{format_instruction}
{thought_instruction}
{action_instruction}
{code_instruction}
""".strip()
SYSTEM_PROMPT_V2_L3 = """
{general_computer_instruction}
{format_instruction}
{observation_instruction}
{thought_instruction}
{action_instruction}
{code_instruction}
""".strip()
def build_sys_prompt(level, password="password", use_random=False):
if not use_random:
if level == "l1":
return SYSTEM_PROMPT_V2_L1.format(
general_computer_instruction=general_computer_instructions[0].format(
password=password
),
format_instruction=l1_format_instruction,
action_instruction=action_instructions[0],
code_instruction=code_instrucion,
)
elif level == "l2":
return SYSTEM_PROMPT_V2_L2.format(
general_computer_instruction=general_computer_instructions[0].format(
password=password
),
format_instruction=l2_format_instruction,
thought_instruction=thought_instructions[0],
action_instruction=action_instructions[0],
code_instruction=code_instrucion,
)
elif level == "l3":
return SYSTEM_PROMPT_V2_L3.format(
general_computer_instruction=general_computer_instructions[0].format(
password=password
),
format_instruction=l3_format_instruction,
observation_instruction=observation_instructions[0],
thought_instruction=thought_instructions[0],
action_instruction=action_instructions[0],
code_instruction=code_instrucion,
)
else:
raise ValueError("Invalid level. Choose from 'l1', 'l2', or 'l3'.")
else:
if level == "l1":
return SYSTEM_PROMPT_V2_L1.format(
general_computer_instruction=random.choice(
general_computer_instructions
),
format_instruction=l1_format_instruction,
action_instruction=random.choice(action_instructions),
code_instruction=code_instrucion,
)
elif level == "l2":
return SYSTEM_PROMPT_V2_L2.format(
general_computer_instruction=random.choice(
general_computer_instructions
),
format_instruction=l2_format_instruction,
thought_instruction=random.choice(thought_instructions),
action_instruction=random.choice(action_instructions),
code_instruction=code_instrucion,
)
elif level == "l3":
return SYSTEM_PROMPT_V2_L3.format(
general_computer_instruction=random.choice(
general_computer_instructions
),
format_instruction=l3_format_instruction,
observation_instruction=random.choice(observation_instructions),
thought_instruction=random.choice(thought_instructions),
action_instruction=random.choice(action_instructions),
code_instruction=code_instrucion,
)
else:
raise ValueError("Invalid level. Choose from 'l1', 'l2', or 'l3'.")
# Modeling prompt templates for generating trajectories
STEP_TEMPLATE = "# Step {step_num}:\n"
INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n"
ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n"
OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n"
ACTION_HISTORY_TEMPLATE_WITH_CODE = "## Action:\n{action}\n\n## Code:\n{code}\n"
THOUGHT_HISTORY_TEMPLATE_WITH_CODE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
OBSERVATION_HISTORY_TEMPLATE_WITH_CODE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"

483
mm_agents/opencua/utils.py Normal file
View File

@ -0,0 +1,483 @@
import re
import base64
from loguru import logger
from typing import List, Optional
from PIL import Image
from io import BytesIO
import tempfile
import os
import math
def encode_image(image_content):
return base64.b64encode(image_content).decode("utf-8")
def smart_resize(
height: int,
width: int,
factor: int = 28,
min_pixels: int = 56 * 56,
max_pixels: int = 14 * 14 * 4 * 1280,
max_aspect_ratio_allowed: Optional[float] = None,
size_can_be_smaller_than_factor: bool = False,
):
"""Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if not size_can_be_smaller_than_factor and (height < factor or width < factor):
raise ValueError(
f"height:{height} or width:{width} must be larger than factor:{factor} "
f"(when size_can_be_smaller_than_factor is False)"
)
elif (
max_aspect_ratio_allowed is not None
and max(height, width) / min(height, width) > max_aspect_ratio_allowed
):
raise ValueError(
f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, "
f"got {max(height, width) / min(height, width)}"
f"(when max_aspect_ratio_allowed is not None)"
)
h_bar = max(1, round(height / factor)) * factor
w_bar = max(1, round(width / factor)) * factor
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = max(1, math.floor(height / beta / factor)) * factor
w_bar = max(1, math.floor(width / beta / factor)) * factor
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = math.ceil(height * beta / factor) * factor
w_bar = math.ceil(width * beta / factor) * factor
return h_bar, w_bar
def call_openai_naive(model, payload, address_hint=None):
"""
Naive OpenAI API call using requests.
"""
# Extract fields from payload
model = payload.get("model")
payload["model"] = model.model_id if hasattr(model, "model_id") else "None"
# address_hint not used here
base_url = model.base_url
# logger.warning(f"Base URL: {base_url}, Payload model: {payload['model']}")
url = f"{base_url}/chat/completions"
headers = {
"Content-Type": "application/json",
}
data = {
**payload,
"n": 1,
}
max_retry = 5
chat_completions = None
success = False
while success is False and max_retry > 0:
try:
json_data = json.dumps(data)
response = requests.post(
url, headers=headers, data=json_data, timeout=120, verify=False
)
if response.status_code == 200:
chat_completions = response.json()
try:
finish_reason = chat_completions["choices"][0].get("finish_reason")
if (
finish_reason is not None and finish_reason == "stop"
): # for most of the time, length will not exceed max_tokens
success = True
else:
time.sleep(5)
max_retry -= 1
except Exception as e:
logger.error(f"Error in processing chat completion: {e}")
time.sleep(5)
max_retry -= 1
else:
logger.error(f"Failed to call OpenAI API: {response.text}")
time.sleep(5)
max_retry -= 1
except requests.exceptions.ReadTimeout:
# timeout is normal, don't print trace
max_retry -= 1
logger.warning(f"Timeout in OpenAI API call, left retries: {max_retry}")
time.sleep(5)
except Exception as e:
max_retry -= 1
logger.exception(f"Failed to call OpenAI API: {e}")
time.sleep(5)
if chat_completions is None:
raise RuntimeError("Failed to call OpenAI API, max_retry used up")
try:
infos = {}
if "choices" in chat_completions:
infos["finish_reason"] = chat_completions["choices"][0].get("finish_reason")
infos["n"] = len(chat_completions["choices"])
if "tool_calls" in chat_completions["choices"][0]["message"]:
infos["tool_calls"] = chat_completions["choices"][0]["message"][
"tool_calls"
]
infos["choices"] = chat_completions["choices"] # for the case of n > 1
if "usage" in chat_completions:
infos["usage"] = chat_completions["usage"]
return chat_completions["choices"][0]["message"]["content"], infos
except Exception as e:
logger.error(f"Error in processing chat completion {e}")
return "", {"n": 1, "usage": 0, "finish_reason": f"error {e}"}
def preprocess_for_naive_openai(self, payload):
if isinstance(payload["model"], str):
payload["model"] = getattr(self, "openai_client", None)
return payload
def encoded_img_to_pil_img(data_str):
base64_str = data_str.replace("data:image/png;base64,", "")
image_data = base64.b64decode(base64_str)
return Image.open(BytesIO(image_data))
def save_to_tmp_img_file(data_str):
base64_str = data_str.replace("data:image/png;base64,", "")
image_data = base64.b64decode(base64_str)
image = Image.open(BytesIO(image_data))
tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
image.save(tmp_img_path)
return tmp_img_path
def bbox_to_center_1000(bbox: str) -> tuple[int, int]:
regex_list = [
r"<\|box_start\|>\((\d+),(\d+)\),\((\d+),(\d+)\)<\|box_end\|>", # '<|box_start|>(576,12),(592,42)<|box_end|>'
r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]<\|box_end\|>", # '<|box_start|>[[576, 12, 592, 42]]<|box_end|>'
r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]<\|box_end\|>", # '<|box_start|>[[576, 12, 592, 42]<|box_end|>', this is actually wrong format, but we parse it anyway
r"<\|box_start\|>\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)<\|box_end\|>", # '<|box_start|>(576, 12, 592, 42)<|box_end|>', this is actually wrong format, but we parse it anyway
r"\((\d+),(\d+)\),\((\d+),(\d+)\)", # Versions without the 'bbox' special tokens
r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]",
r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]",
r"\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)",
]
for regex in regex_list:
match = re.search(regex, bbox)
if match:
break
if not match:
raise ValueError(
f"Bounding box coordinates not found in the input string: {bbox}"
)
x_top_left, y_top_left, x_bottom_right, y_bottom_right = map(int, match.groups())
x_center = (x_top_left + x_bottom_right) // 2
y_center = (y_top_left + y_bottom_right) // 2
return x_center, y_center
def bbox_to_center_1(bbox: str) -> tuple[int, int]:
regex_list = [
r"\[\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*\]",
]
for regex in regex_list:
match = re.search(regex, bbox)
if match:
break
if not match:
raise ValueError(
f"Bounding box coordinates not found in the input string: {bbox}"
)
coordinates = tuple(map(float, match.groups()))
coordinates = [int(coord * 1000) for coord in coordinates]
x_center = (coordinates[0] + coordinates[2]) // 2
y_center = (coordinates[1] + coordinates[3]) // 2
return x_center, y_center
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
if coordinate_type == "relative":
return int(round(x * screen_width)), int(round(y * screen_height))
elif coordinate_type == "absolute":
return x, y
elif coordinate_type == "qwen25":
height, width = smart_resize(
height=screen_height,
width=screen_width,
factor=28,
min_pixels=3136,
max_pixels=12845056,
)
return int(x / width * screen_width), int(y / height * screen_height)
elif coordinate_type == "relative1000":
if screen_width == 0 or screen_height == 0:
raise ValueError(
"Screen width and height must be greater than zero for relative1000 coordinates."
)
x_abs = int(round(x * screen_width / 1000))
y_abs = int(round(y * screen_height / 1000))
return x_abs, y_abs
else:
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
def rescale_coord(
coord: tuple[int, int],
original_width: int,
original_height: int,
scaled_width=1000,
scaled_height=1000,
) -> tuple[int, int]:
# According to https://huggingface.co/spaces/maxiw/OS-ATLAS/blob/398c3256a4fec409a074e0e4b5ac1d1d5bf7c240/app.py#L36
# It seems that OS-ATLAS model are rescaled to output 1000x1000 images
# So we need to rescale the coordinates back to the original image size
x_scale = original_width / scaled_width
y_scale = original_height / scaled_height
return int(coord[0] * x_scale), int(coord[1] * y_scale)
def _pyautogui_code_to_absolute_coordinates(
pyautogui_code_relative_coordinates,
logical_screen_size,
coordinate_type="relative",
model_input_size=None,
):
"""
Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
"""
import re
import ast
if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
raise ValueError(
f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25']."
)
screen_width, screen_height = logical_screen_size
if model_input_size is not None:
model_width, model_height = model_input_size
width_scale, height_scale = (
screen_width / model_width,
screen_height / model_height,
)
else:
width_scale, height_scale = 1, 1
pattern = r"(pyautogui\.\w+\([^\)]*\))"
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
new_code = pyautogui_code_relative_coordinates
for full_call in matches:
func_name_pattern = r"(pyautogui\.\w+)\((.*)\)"
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
if not func_match:
continue
func_name = func_match.group(1)
args_str = func_match.group(2)
try:
parsed = ast.parse(f"func({args_str})").body[0].value
parsed_args = parsed.args
parsed_keywords = parsed.keywords
except SyntaxError:
return pyautogui_code_relative_coordinates
function_parameters = {
"click": ["x", "y", "clicks", "interval", "button", "duration", "pause"],
"moveTo": ["x", "y", "duration", "tween", "pause"],
"moveRel": ["xOffset", "yOffset", "duration", "tween", "pause"],
"dragTo": ["x", "y", "duration", "button", "mouseDownUp", "pause"],
"dragRel": [
"xOffset",
"yOffset",
"duration",
"button",
"mouseDownUp",
"pause",
],
"doubleClick": ["x", "y", "interval", "button", "duration", "pause"],
}
func_base_name = func_name.split(".")[-1]
param_names = function_parameters.get(func_base_name, [])
args = {}
for idx, arg in enumerate(parsed_args):
if idx < len(param_names):
param_name = param_names[idx]
arg_value = ast.literal_eval(arg)
args[param_name] = arg_value
try:
for kw in parsed_keywords:
param_name = kw.arg
arg_value = ast.literal_eval(kw.value)
args[param_name] = arg_value
except Exception as e:
logger.error(f"Error parsing keyword arguments: {e}")
return pyautogui_code_relative_coordinates
updated = False
if "x" in args and "y" in args:
try:
x_rel = float(args["x"])
y_rel = float(args["y"])
x_abs, y_abs = _coordinate_projection(
x_rel, y_rel, screen_width, screen_height, coordinate_type
)
# logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
args["x"] = x_abs * width_scale
args["y"] = y_abs * height_scale
updated = True
except ValueError:
pass
if "xOffset" in args and "yOffset" in args:
try:
x_rel = float(args["xOffset"])
y_rel = float(args["yOffset"])
x_abs, y_abs = _coordinate_projection(
x_rel, y_rel, screen_width, screen_height, coordinate_type
)
args["xOffset"] = x_abs * width_scale
args["yOffset"] = y_abs * height_scale
updated = True
except ValueError:
pass
if updated:
reconstructed_args = []
for idx, param_name in enumerate(param_names):
if param_name in args:
arg_value = args[param_name]
if isinstance(arg_value, str):
arg_repr = f"'{arg_value}'"
else:
arg_repr = str(arg_value)
reconstructed_args.append(arg_repr)
else:
break
used_params = set(param_names[: len(reconstructed_args)])
for kw in parsed_keywords:
if kw.arg not in used_params:
arg_value = args[kw.arg]
if isinstance(arg_value, str):
arg_repr = f"{kw.arg}='{arg_value}'"
else:
arg_repr = f"{kw.arg}={arg_value}"
reconstructed_args.append(arg_repr)
new_args_str = ", ".join(reconstructed_args)
new_full_call = f"{func_name}({new_args_str})"
new_code = new_code.replace(full_call, new_full_call)
return new_code
def split_args(args_str: str) -> List[str]:
args = []
current_arg = ""
within_string = False
string_char = ""
prev_char = ""
for char in args_str:
if char in ['"', "'"]:
if not within_string:
within_string = True
string_char = char
elif within_string and prev_char != "\\" and char == string_char:
within_string = False
if char == "," and not within_string:
args.append(current_arg)
current_arg = ""
else:
current_arg += char
prev_char = char
if current_arg:
args.append(current_arg)
return args
def correct_pyautogui_arguments(code: str) -> str:
function_corrections = {
"write": {
"incorrect_args": ["text", "content"],
"correct_args": [],
"keyword_arg": "message",
},
"press": {
"incorrect_args": ["key", "button"],
"correct_args": [],
"keyword_arg": None,
},
"hotkey": {
"incorrect_args": ["key1", "key2", "keys"],
"correct_args": [],
"keyword_arg": None,
},
}
lines = code.strip().split("\n")
corrected_lines = []
for line in lines:
line = line.strip()
match = re.match(r"(pyautogui\.(\w+))\((.*)\)", line)
if match:
full_func_call = match.group(1)
func_name = match.group(2)
args_str = match.group(3)
if func_name in function_corrections:
func_info = function_corrections[func_name]
args = split_args(args_str)
corrected_args = []
for arg in args:
arg = arg.strip()
kwarg_match = re.match(r"(\w+)\s*=\s*(.*)", arg)
if kwarg_match:
arg_name = kwarg_match.group(1)
arg_value = kwarg_match.group(2)
if arg_name in func_info["incorrect_args"]:
if func_info["keyword_arg"]:
corrected_args.append(
f"{func_info['keyword_arg']}={arg_value}"
)
else:
corrected_args.append(arg_value)
else:
corrected_args.append(f"{arg_name}={arg_value}")
else:
corrected_args.append(arg)
corrected_args_str = ", ".join(corrected_args)
corrected_line = f"{full_func_call}({corrected_args_str})"
corrected_lines.append(corrected_line)
else:
corrected_lines.append(line)
else:
corrected_lines.append(line)
corrected_code = "\n".join(corrected_lines)
return corrected_code
def image_message_from_obs(obs, for_training=False):
if not for_training:
return {
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
"detail": "high",
},
}
else:
return {"type": "image_url", "image_url": {"url": obs["screenshot_path"]}}

View File

@ -1,736 +0,0 @@
"""
OpenCUA Agent Implementation
This module implements an OpenCUA agent for desktop automation tasks, building upon
existing frameworks and integrating multiple coordinate mapping systems.
Framework and Implementation Sources:
- Main framework structure follows: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/agent.py
- Agent implementation adapted from: https://github.com/xlang-ai/OSWorld/blob/main/mm_agents/aguvis_agent.py
- Qwen2.5-VL coordinate mapping from: https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
"""
import re
import os
import ast
import time
import math
import httpx
import base64
import backoff
from loguru import logger
from typing import Dict, List, Tuple, Optional
# System prompts used in the training data
AGNET_SYS_PROMPT_L1 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
# AGNET_SYS_PROMPT_L2 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"—\", maximize \"□\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}".strip()
AGNET_SYS_PROMPT_L3 = "You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task.\n\nFor each step, provide your response in this format:\n\nObservation:\n - Describe the current computer state based on the full screenshot in detail. \n - Application Context:\n - The active application\n - The active window or page\n - Overall layout and visible interface\n - Key Elements:\n - Menu items and toolbars \n - Buttons and controls\n - Text fields and content\n - Dialog boxes or popups\n - Error messages or notifications\n - Loading states\n - Other key elements\n - Describe any content, elements, options, information or clues that are possibly relevant to achieving the task goal, including their name, content, or shape (if possible).\n\nThought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning\n\nAction:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize \"\", maximize \"\", close \"X\")\n - if the action involves keyboard actions like 'press', 'write', 'hotkey':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions\n\nFinally, output the action as PyAutoGUI code or the following functions:\n- {\"name\": \"computer.triple_click\", \"description\": \"Triple click on the screen\", \"parameters\": {\"type\": \"object\", \"properties\": {\"x\": {\"type\": \"number\", \"description\": \"The x coordinate of the triple click\"}, \"y\": {\"type\": \"number\", \"description\": \"The y coordinate of the triple click\"}}, \"required\": [\"x\", \"y\"]}}\n- {\"name\": \"computer.terminate\", \"description\": \"Terminate the current task and report its completion status\", \"parameters\": {\"type\": \"object\", \"properties\": {\"status\": {\"type\": \"string\", \"enum\": [\"success\", \"fail\"], \"description\": \"The status of the task\"}}, \"required\": [\"status\"]}}\n".strip()
# Testing prompt on OSWorld-Verified
AGNET_SYS_PROMPT_L2 = """You are a GUI agent. You are given a task and a screenshot of the screen. You need to perform a series of pyautogui actions to complete the task. The password of the computer is "osworld-public-evaluation". If the task is not possible to do, output the action computer.terminate(status='failure').
For each step, provide your response in this format:
Thought:\n - Step by Step Progress Assessment:\n - Analyze completed task parts and their contribution to the overall goal\n - Reflect on potential errors, unexpected results, or obstacles\n - If previous action was incorrect, predict a logical recovery step\n - Next Action Analysis:\n - List possible next actions based on current state\n - Evaluate options considering current state and previous actions\n - Propose most logical next action\n - Anticipate consequences of the proposed action\n - For Text Input Actions:\n - Note current cursor position\n - Consolidate repetitive actions (specify count for multiple keypresses)\n - Describe expected final text outcome\n - Use first-person perspective in reasoning
Action:\n Provide clear, concise, and actionable instructions:\n - If the action involves interacting with a specific target:\n - Describe target explicitly without using coordinates\n - Specify element names when possible (use original language if non-English)\n - Describe features (shape, color, position) if name unavailable\n - For window control buttons, identify correctly (minimize "", maximize "", close "X")\n - if the action involves keyboard actions like \'press\', \'write\', \'hotkey\':\n - Consolidate repetitive keypresses with count\n - Specify expected text outcome for typing actions
Finally, output the action as PyAutoGUI code or the following functions:
- {"name": "computer.triple_click", "description": "Triple click on the screen", "parameters": {"type": "object", "properties": {"x": {"type": "number", "description": "The x coordinate of the triple click"}, "y": {"type": "number", "description": "The y coordinate of the triple click"}}, "required": ["x", "y"]}}
- {"name": "computer.terminate", "description": "Terminate the current task and report its completion status", "parameters": {"type": "object", "properties": {"status": {"type": "string", "enum": ["success", "failure"], "description": "The status of the task"}}, "required": ["status"]}}
""".strip()
STEP_TEMPLATE = "# Step {step_num}:\n"
INSTRUTION_TEMPLATE = "# Task Instruction:\n{instruction}\n\nPlease generate the next move according to the screenshot, task instruction and previous steps (if provided).\n"
ACTION_HISTORY_TEMPLATE = "## Action:\n{action}\n"
THOUGHT_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n"
OBSERVATION_HISTORY_TEMPLATE = "## Observation:\n{observation}\n\n## Thought:\n{thought}\n\n## Action:\n{action}\n"
DETAIL_HISTORY_TEMPLATE = "## Thought:\n{thought}\n\n## Action:\n{action}\n\n## Code:\n{code}\n"
def encode_image(image_content):
"""Encode the image to base64"""
return base64.b64encode(image_content).decode('utf-8')
def parse_response_to_cot_and_action(input_string, screen_size, coordinate_type) -> Tuple[str, List[str], dict]:
"""Parse response including Observation, Thought, Action and code block"""
try:
sections = {}
obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if obs_match:
sections['observation'] = obs_match.group(1).strip()
thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if thought_match:
sections['thought'] = thought_match.group(1).strip()
action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE)
if action_match:
action = action_match.group(1).strip()
sections['action'] = action.strip()
if "computer.terminate" in input_string.lower():
# Look for code blocks that might contain terminate command
code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE)
if code_blocks:
last_code = code_blocks[-1].strip().lower()
if "fail" in last_code:
sections['code'] = "FAIL"
return "FAIL", ["FAIL"], sections
elif "success" in last_code:
sections['code'] = "DONE"
return "DONE", ["DONE"], sections
# Default to DONE if terminate is mentioned but no specific status
sections['code'] = "DONE"
return "DONE", ["DONE"], sections
code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL)
if code_blocks:
code = code_blocks[-1].strip()
sections['original_code'] = transform_agnet_action_to_code_block(code)
corrected_code = correct_pyautogui_arguments(code)
sections['code'] = corrected_code
sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_size[0], screen_height=screen_size[1], coordinate_type=coordinate_type)
else:
# No code blocks found
sections['code'] = "WAIT"
return "WAIT", ["WAIT"], sections
if 'code' not in sections:
logger.error("Missing required action or code section")
return None, None, {}
if 'action' not in sections:
sections['action'] = ""
return sections['action'], [sections['code']], sections
except Exception as e:
logger.exception(f"Error parsing response: {str(e)}\nInput string: {input_string}")
return None, None, {}
def correct_pyautogui_arguments(code: str) -> str:
"""Correct the pyautogui arguments"""
function_corrections = {
'write': {
'incorrect_args': ['text', 'content'],
'correct_args': [],
'keyword_arg': 'message'
},
'press': {
'incorrect_args': ['key', 'button'],
'correct_args': [],
'keyword_arg': None
},
'hotkey': {
'incorrect_args': ['key1', 'key2', 'keys'],
'correct_args': [],
'keyword_arg': None
},
}
lines = code.strip().split('\n')
corrected_lines = []
for line in lines:
line = line.strip()
match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line)
if match:
full_func_call = match.group(1)
func_name = match.group(2)
args_str = match.group(3)
if func_name in function_corrections:
func_info = function_corrections[func_name]
args = split_args(args_str)
corrected_args = []
for arg in args:
arg = arg.strip()
kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg)
if kwarg_match:
arg_name = kwarg_match.group(1)
arg_value = kwarg_match.group(2)
if arg_name in func_info['incorrect_args']:
if func_info['keyword_arg']:
corrected_args.append(f"{func_info['keyword_arg']}={arg_value}")
else:
corrected_args.append(arg_value)
else:
corrected_args.append(f'{arg_name}={arg_value}')
else:
corrected_args.append(arg)
corrected_args_str = ', '.join(corrected_args)
corrected_line = f'{full_func_call}({corrected_args_str})'
corrected_lines.append(corrected_line)
else:
corrected_lines.append(line)
else:
corrected_lines.append(line)
corrected_code = '\n'.join(corrected_lines)
return corrected_code
def split_args(args_str: str) -> List[str]:
"""Split the arguments string into a list of arguments"""
args = []
current_arg = ''
within_string = False
string_char = ''
prev_char = ''
for char in args_str:
if char in ['"', "'"]:
if not within_string:
within_string = True
string_char = char
elif within_string and prev_char != '\\' and char == string_char:
within_string = False
if char == ',' and not within_string:
args.append(current_arg)
current_arg = ''
else:
current_arg += char
prev_char = char
if current_arg:
args.append(current_arg)
return args
def smart_resize(
height: int,
width: int,
factor: int,
min_pixels: int,
max_pixels: int,
max_aspect_ratio_allowed: Optional[float] = None,
size_can_be_smaller_than_factor: bool = False,
):
"""
The function is modified from https://github.com/QwenLM/Qwen2.5-VL/blob/main/qwen-vl-utils/src/qwen_vl_utils/vision_process.py
Qwen2.5-VL based model need this function to resize screenshots.
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if not size_can_be_smaller_than_factor and (height < factor or width < factor):
raise ValueError(
f"height:{height} or width:{width} must be larger than factor:{factor} "
f"(when size_can_be_smaller_than_factor is False)"
)
elif max_aspect_ratio_allowed is not None and max(height, width) / min(height, width) > max_aspect_ratio_allowed:
raise ValueError(
f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, "
f"got {max(height, width) / min(height, width)}"
f"(when max_aspect_ratio_allowed is not None)"
)
h_bar = max(1, round(height / factor)) * factor
w_bar = max(1, round(width / factor)) * factor
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = max(1, math.floor(height / beta / factor)) * factor
w_bar = max(1, math.floor(width / beta / factor)) * factor
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = math.ceil(height * beta / factor) * factor
w_bar = math.ceil(width * beta / factor) * factor
return h_bar, w_bar
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
"""Project the coordinates to the absolute scale"""
if coordinate_type == "relative":
return int(round(x * screen_width)), int(round(y * screen_height))
elif coordinate_type == "absolute":
return x, y
elif coordinate_type == "qwen25":
if 0 <= x <= 1 and 0 <= y <= 1:
# If already normalized, treat like "relative"
return int(round(x * screen_width)), int(round(y * screen_height))
height, width = smart_resize(
height=screen_height,
width=screen_width,
factor=28,
min_pixels=3136,
max_pixels=12845056 # We use this max_pixels setting in our training data
)
return int(x / width * screen_width), int(y / height * screen_height)
else:
raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="relative"):
"""Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size."""
if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].")
pattern = r'(pyautogui\.\w+\([^\)]*\))'
matches = re.findall(pattern, pyautogui_code_relative_coordinates)
new_code = pyautogui_code_relative_coordinates
for full_call in matches:
func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
func_match = re.match(func_name_pattern, full_call, re.DOTALL)
if not func_match:
continue
func_name = func_match.group(1)
args_str = func_match.group(2)
try:
parsed = ast.parse(f"func({args_str})").body[0].value
parsed_args = parsed.args
parsed_keywords = parsed.keywords
except SyntaxError:
return pyautogui_code_relative_coordinates
function_parameters = {
'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'],
'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'],
'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
}
func_base_name = func_name.split('.')[-1]
param_names = function_parameters.get(func_base_name, [])
args = {}
for idx, arg in enumerate(parsed_args):
if idx < len(param_names):
param_name = param_names[idx]
arg_value = ast.literal_eval(arg)
args[param_name] = arg_value
try:
for kw in parsed_keywords:
param_name = kw.arg
arg_value = ast.literal_eval(kw.value)
args[param_name] = arg_value
except Exception as e:
logger.error(f"Error parsing keyword arguments: {e}")
return pyautogui_code_relative_coordinates
updated = False
if 'x' in args and 'y' in args:
try:
x_rel = float(args['x'])
y_rel = float(args['y'])
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
args['x'] = x_abs
args['y'] = y_abs
updated = True
except ValueError:
pass
if 'xOffset' in args and 'yOffset' in args:
try:
x_rel = float(args['xOffset'])
y_rel = float(args['yOffset'])
x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type)
args['xOffset'] = x_abs
args['yOffset'] = y_abs
updated = True
except ValueError:
pass
if updated:
reconstructed_args = []
for idx, param_name in enumerate(param_names):
if param_name in args:
arg_value = args[param_name]
if isinstance(arg_value, str):
arg_repr = f"'{arg_value}'"
else:
arg_repr = str(arg_value)
reconstructed_args.append(arg_repr)
else:
break
used_params = set(param_names[:len(reconstructed_args)])
for kw in parsed_keywords:
if kw.arg not in used_params:
arg_value = args[kw.arg]
if isinstance(arg_value, str):
arg_repr = f"{kw.arg}='{arg_value}'"
else:
arg_repr = f"{kw.arg}={arg_value}"
reconstructed_args.append(arg_repr)
new_args_str = ', '.join(reconstructed_args)
new_full_call = f"{func_name}({new_args_str})"
new_code = new_code.replace(full_call, new_full_call)
return new_code
def extract_positions_and_instructions(code, action) -> list[dict]:
"""
Extracts all `(x, y)` coordinates (both positional and keyword arguments)
and their associated preceding comments as instructions from Python code.
If there are no comments, use the corresponding action instead.
Args:
code (str): The Python code as a string.
action (str): The low-level action as a string.
Returns:
list[dict]: A list of dictionaries with extracted positions and instructions.
- function (str): The pyautogui function name.
- x (int or float): The x-coordinate.
- y (int or float): The y-coordinate.
- instruction (str): The preceding comment as an instruction.
"""
lines = code.splitlines()
extracted = []
preceding_comment = action # To store the preceding comment
for line in lines:
preceding_comment = action
# Check if the line is a comment and store it
if line.strip().startswith("#"):
preceding_comment = line.strip().lstrip("#").strip() # Clean the comment
# Match pyautogui functions with positional arguments
match_positional = re.match(r"(pyautogui\.\w+)\((\d+(\.\d+)?),\s*(\d+(\.\d+)?).*?\)", line)
if match_positional:
extracted.append({
"function": match_positional.group(1), # pyautogui function name
"x": float(match_positional.group(2)) if '.' in match_positional.group(2)\
else int(match_positional.group(2)), # x-coordinate
"y": float(match_positional.group(4)) if '.' in match_positional.group(4)\
else int(match_positional.group(3)), # y-coordinate
"instruction": preceding_comment, # Use the preceding comment
})
preceding_comment = None # Reset after associating it with a line
continue
# Match pyautogui functions with keyword arguments
match_keyword = re.match(r"(pyautogui\.\w+)\(.*?x=(\d+(\.\d+)?),\s*y=(\d+(\.\d+)?).*?\)", line)
if match_keyword:
extracted.append({
"function": match_keyword.group(1), # pyautogui function name
"x": float(match_keyword.group(2)) if '.' in match_keyword.group(2)\
else int(match_keyword.group(2)), # x-coordinate
"y": float(match_keyword.group(4)) if '.' in match_keyword.group(4)\
else int(match_keyword.group(3)), # y-coordinate
"instruction": preceding_comment, # Use the preceding comment
})
preceding_comment = None # Reset after associating it with a line
logger.info(f"Grounding extracted:\n{extracted}")
return extracted
def update_code_with_new_coordinates(code, updated_positions):
"""
Replaces old `(x, y)` coordinates (both positional and keyword arguments)
with updated ones in the code, handling multiple occurrences correctly.
Args:
code (str): The original Python code as a string.
updated_positions (list): A list of dictionaries with updated positions.
Returns:
str: The updated Python code.
"""
lines = code.splitlines()
updated_code_lines = []
position_index = 0 # Tracks which position update to use
for line in lines:
if position_index < len(updated_positions):
# Get the next update position
update = updated_positions[position_index]
function_pattern_positional = rf"{update['function']}\(\d+(\.\d+)?, \d+(\.\d+)?"
function_pattern_keyword = rf"{update['function']}\(.*?x=\d+(\.\d+)?, y=\d+(\.\d+)?"
if re.search(function_pattern_positional, line):
# Replace positional arguments
line = re.sub(
function_pattern_positional,
f"{update['function']}({update['x']}, {update['y']}",
line,
count=1
)
position_index += 1 # Move to the next update
elif re.search(function_pattern_keyword, line):
# Replace keyword arguments
line = re.sub(
function_pattern_keyword,
f"{update['function']}(x={update['x']}, y={update['y']}",
line,
count=1
)
position_index += 1 # Move to the next update
updated_code_lines.append(line)
return "\n".join(updated_code_lines)
def transform_agnet_action_to_code_block(action):
"""Transform the agent action to a code block: not used in agent, for logging only"""
if "computer.terminate" in action or "browser.select_option" in action or "browser.clear" in action:
return f"```code\n{action}\n```"
else:
return f"```python\n{action}\n```"
class OpenCUAAgent:
"""
OpenCUA Agent for desktop automation tasks.
This class implements a OpenCUA Model based agent that can observe
desktop environments through screenshots and execute mouse/keyboard actions
via PyAutoGUI to complete automation tasks.
Attributes:
model (str): Name of the language model being used
history_type (str): Type of history recording mechanism
actions (list): History of executed actions
observations (list): History of environment observations
cots (list): Chain of thought reasoning records
"""
def __init__(
self,
model: str, # OpenCUA model name
history_type: str, # History step type: action_history, thought_history, observation_history
max_image_history_length: int = 3, # The max number of images in the history
platform: str = "ubuntu", # The platform of the computer
max_tokens: int = 1500, # The max number of tokens in the response
top_p: float = 0.9, # The top p value in the response
temperature: float = 0, # The temperature value in the response
action_space: str = "pyautogui", # The action space: pyautogui
observation_type: str = "screenshot", # The observation type: screenshot
cot_level: str = "l2", # The CoT level: l1, l2, l3
screen_size: Tuple[int, int] = (1920, 1080), # The screen size
coordinate_type: str = "relative", # The coordinate type: relative, absolute, qwen25
**kwargs
):
assert coordinate_type in ["relative", "absolute", "qwen25"]
assert action_space in ["pyautogui"], "Invalid action space"
assert observation_type in ["screenshot"], "Invalid observation type"
assert history_type in ["action_history", "thought_history", "observation_history"]
assert model is not None, "Model cannot be None"
self.model = model
self.platform = platform
self.max_tokens = max_tokens
self.top_p = top_p
self.temperature = temperature
self.action_space = action_space
self.observation_type = observation_type
self.history_type = history_type
self.coordinate_type = coordinate_type
self.cot_level = cot_level
self.screen_size = screen_size
self.max_image_history_length = max_image_history_length
if history_type == "action_history":
self.HISTORY_TEMPLATE = ACTION_HISTORY_TEMPLATE
elif history_type == "thought_history":
self.HISTORY_TEMPLATE = THOUGHT_HISTORY_TEMPLATE
elif history_type == "observation_history":
self.HISTORY_TEMPLATE = OBSERVATION_HISTORY_TEMPLATE
else:
raise ValueError(f"Invalid history type: {history_type}")
if cot_level == "l3":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L3
elif cot_level == "l2":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L2
elif cot_level == "l1":
self.SYSTEM_PROMPT = AGNET_SYS_PROMPT_L1
else:
raise ValueError(f"Invalid COT level: {cot_level}")
self.actions = []
self.observations = []
self.cots = []
def reset(self, _logger=None):
global logger
logger = _logger if _logger is not None else logging.getLogger("desktopenv.agent")
self.observations = []
self.cots = []
self.actions = []
def _scale_scroll_for_windows(self, code: str, factor: int = 50) -> str:
""" pyautogui.scroll has a different scale on Ubuntu and Windows, multiple 'factor' when scrolling on Windows system"""
if self.platform.lower() != "windows":
return code
pattern_pos = re.compile(r'(pyautogui\.scroll\()\s*([-+]?\d+)\s*\)')
code = pattern_pos.sub(lambda m: f"{m.group(1)}{int(m.group(2))*factor})", code)
return code
def predict(self, instruction: str, obs: Dict, **kwargs) -> Tuple[str, List[str], Dict]:
"""
Predict the next action(s) based on the current observation.
"""
if "step_idx" in kwargs:
logger.info(f"========= {self.model} Step {kwargs['step_idx']} =======")
else:
logger.info(f"========================== {self.model} ===================================")
logger.info(f"Instruction: \n{instruction}")
messages = []
messages.append({
"role": "system",
"content": self.SYSTEM_PROMPT
})
history_step_texts = []
for i in range(len(self.actions)):
if i > len(self.actions) - self.max_image_history_length:
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(self.observations[i]['screenshot'])}"}
}
]
})
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i].get('action')
)
messages.append({
"role": "assistant",
"content": history_content
})
else:
history_content = STEP_TEMPLATE.format(step_num=i+1) + self.HISTORY_TEMPLATE.format(
observation=self.cots[i].get('observation'),
thought=self.cots[i].get('thought'),
action=self.cots[i].get('action')
)
history_step_texts.append(history_content)
if i == len(self.actions) - self.max_image_history_length:
messages.append({
"role":"assistant",
"content": "\n".join(history_step_texts)
})
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
},
{
"type": "text",
"text": INSTRUTION_TEMPLATE.format(instruction=instruction)
}
]
})
response = self.call_llm({
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"temperature": self.temperature
}, self.model)
logger.info(f"Model Output: \n{response}")
if not response:
logger.error("No response found in the response.")
return "ERROR", ["DONE"], {}
low_level_instruction, pyautogui_actions, other_cot = parse_response_to_cot_and_action(response, self.screen_size, self.coordinate_type)
if not pyautogui_actions or len(pyautogui_actions) == 0:
logger.error("No pyautogui actions found in the response.")
return response, ["FAIL"], {}
pyautogui_actions = [
self._scale_scroll_for_windows(code) for code in pyautogui_actions
]
self.observations.append(obs)
logger.info(f"Parsed Low-level Action: \n{low_level_instruction}")
logger.info(f"Parsed pyautogui Action: \n{pyautogui_actions}")
self.actions.append(low_level_instruction)
if 'action' not in other_cot or not other_cot['action'] or 'thought' not in other_cot or not other_cot['thought']:
logger.error("Error! no action/thought in cot")
logger.error(f"response: {response}")
logger.error(f"cot: {other_cot}")
self.cots.append(other_cot)
# Print message structure if needed
# messages_to_print = []
# current_image = 1
# for msg in messages:
# msg_copy = copy.deepcopy(msg)
# if isinstance(msg_copy['content'], list):
# for content in msg_copy['content']:
# if content['type'] == 'image_url':
# content['image_url']['url'] = f'Image {current_image}'
# current_image += 1
# messages_to_print.append(msg_copy)
# messages_to_print.append({
# "new_step_cot": other_cot,
# "response": response
# })
# logger.info(json.dumps(messages_to_print, indent=2))
logger.info(f"New step cot: {other_cot}")
return response, pyautogui_actions, {}
@backoff.on_exception(
backoff.constant,
# here you should add more model exceptions as you want,
# but you are forbidden to add "Exception", that is, a common type of exception
# because we want to catch this kind of Exception in the outside to ensure
# each example won't exceed the time limit
(
Exception
),
interval=30,
max_tries=10
)
def call_llm(self, payload, model):
"""Call the LLM API"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}"
}
for _ in range(30):
response = httpx.post(
os.environ['OPENCUA_URL'],
headers=headers,
json=payload,
timeout=500,
verify=False
)
if response.status_code != 200:
logger.error("Failed to call LLM: " + response.text)
logger.error("Retrying...")
time.sleep(5)
else:
response = response.json()
finish_reason = response["choices"][0].get("finish_reason")
if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens
return response['choices'][0]['message']['content']
else:
logger.error("LLM did not finish properly, retrying...")
time.sleep(5)

View File

@ -3,29 +3,34 @@
You should first host the OpenCUA model on your local machine or a server.
Command for OpenCUA-72B:
```
python run_multienv_opencua.py \
--headless \
--observation_type screenshot \
--model OpenCUA-72B \
--result_dir ./results\
--test_all_meta_path evaluation_examples/test_nogdrive.json \
--max_steps 100 \
--num_envs 30 \
--coordinate_type qwen25
```
Command for OpenCUA-7B and OpenCUA-32B:
```
python run_multienv_opencua.py \
--headless \
--observation_type screenshot \
--model OpenCUA-32B \
--result_dir ./results --test_all_meta_path evaluation_examples/test_all_no_gdrive.json \
--max_steps 100 \
--num_envs 30 \
--coordinate_type qwen25
```
Command for OpenCUA-Qwen2-7B and OpenCUA-A3B:
```
python run_multienv_opencua.py \
--headless \
--observation_type screenshot \
--model OpenCUA-A3B \
--result_dir ./results \
--result_dir ./results\
--test_all_meta_path evaluation_examples/test_nogdrive.json \
--max_steps 100 \
--num_envs 10 \
--coordinate_type relative
--num_envs 30 \
--coordinate_type qwen25 \
--use_old_sys_prompt
```
"""
@ -44,7 +49,7 @@ from multiprocessing import Process, Manager
from multiprocessing import current_process
import lib_run_single
from desktop_env.desktop_env import DesktopEnv
from mm_agents.opencua_agent import OpenCUAAgent
from mm_agents.opencua import OpenCUAAgent
# Global variables for signal handling
active_environments = []
@ -76,8 +81,8 @@ def config() -> argparse.Namespace:
default="screenshot",
help="Observation type",
)
parser.add_argument("--sleep_after_execution", type=float, default=3.0)
parser.add_argument("--max_steps", type=int, default=15)
parser.add_argument("--sleep_after_execution", type=float, default=5.0)
parser.add_argument("--max_steps", type=int, default=100)
# evaluation config
parser.add_argument(
@ -85,7 +90,7 @@ def config() -> argparse.Namespace:
)
# lm config
parser.add_argument("--model", type=str, default="opencua")
parser.add_argument("--model", type=str, default=None)
parser.add_argument("--temperature", type=float, default=0)
parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--max_tokens", type=int, default=2048)
@ -94,13 +99,14 @@ def config() -> argparse.Namespace:
# OpenCUAagent config
parser.add_argument("--cot_level", type=str, default="l2", help="CoT version: l1, l2, l3. Default is l2 includes 'thought' and 'action'")
parser.add_argument("--history_type", type=str, default="action_history", help="Use action to represent history steps", choices=["action_history", "thought_history", "observation_history"])
parser.add_argument("--coordinate_type", type=str, default="relative", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"])
parser.add_argument("--coordinate_type", type=str, default="qwen25", help="Type of coordinate: Qwen2-VL or Kimi-VL based models use 'relative'; Qwen2.5-VL based models use 'qwen25'", choices=["relative", "qwen25"])
parser.add_argument("--max_image_history_length", type=int, default=3, help="The max number of images in the history.")
parser.add_argument("--use_old_sys_prompt", action="store_true", help="Use the old system prompt for OpenCUA-7B and OpenCUA-32B")
# example config
parser.add_argument("--domain", type=str, default="all")
parser.add_argument(
"--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
"--test_all_meta_path", type=str, default="evaluation_examples/test_nogdrive.json"
)
# logging related
@ -124,6 +130,9 @@ def config() -> argparse.Namespace:
parser.add_argument(
"--screen_height", type=int, default=1080, help="Screen height"
)
parser.add_argument(
"--password", type=str, default="osworld-public-evaluation", help="The password for the computer if needed"
)
args = parser.parse_args()
return args
@ -253,6 +262,9 @@ def run_env_tasks(task_queue: Queue, args: argparse.Namespace, shared_scores: li
screen_size=(args.screen_width, args.screen_height),
coordinate_type=args.coordinate_type,
max_image_history_length=args.max_image_history_length,
max_steps=args.max_steps,
use_old_sys_prompt=args.use_old_sys_prompt,
password=args.password,
)
try:
lib_run_single.run_single_example_opencua(