Compare commits
1 Commits
main
...
djlu/qwen3
| Author | SHA1 | Date |
|---|---|---|
|
|
23874ca9e9 |
|
|
@ -197,4 +197,11 @@ vmware_vm_data
|
|||
|
||||
.vscode
|
||||
|
||||
dataimpulse_proxy_config.json
|
||||
dataimpulse_proxy_config.json
|
||||
|
||||
## reference and draft and debug
|
||||
reference/
|
||||
draft/
|
||||
manual_examine.py
|
||||
run_human_examine.sh
|
||||
quick_start.py
|
||||
|
|
@ -0,0 +1,585 @@
|
|||
import base64
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
from io import BytesIO
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import backoff
|
||||
import openai
|
||||
from PIL import Image
|
||||
from requests.exceptions import SSLError
|
||||
from google.api_core.exceptions import (
|
||||
InvalidArgument,
|
||||
ResourceExhausted,
|
||||
InternalServerError,
|
||||
BadRequest,
|
||||
)
|
||||
from mm_agents.utils.qwen_vl_utils import smart_resize
|
||||
|
||||
|
||||
logger = None
|
||||
|
||||
MAX_RETRY_TIMES = 5
|
||||
|
||||
|
||||
def encode_image(image_content):
|
||||
return base64.b64encode(image_content).decode("utf-8")
|
||||
|
||||
|
||||
def process_image(image_bytes):
|
||||
"""
|
||||
Process an image for Qwen VL models (thinking variant).
|
||||
Uses a tighter resize cap consistent with the thinking DUN agent.
|
||||
"""
|
||||
image = Image.open(BytesIO(image_bytes))
|
||||
width, height = image.size
|
||||
|
||||
resized_height, resized_width = smart_resize(
|
||||
height=height,
|
||||
width=width,
|
||||
factor=32,
|
||||
max_pixels=16 * 16 * 4 * 1280,
|
||||
)
|
||||
|
||||
image = image.resize((resized_width, resized_height))
|
||||
|
||||
buffer = BytesIO()
|
||||
image.save(buffer, format="PNG")
|
||||
processed_bytes = buffer.getvalue()
|
||||
|
||||
return base64.b64encode(processed_bytes).decode("utf-8")
|
||||
|
||||
|
||||
class Qwen3VLAgent:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
platform: str = "ubuntu",
|
||||
model: str = "qwen3-vl",
|
||||
max_tokens: int = 1500,
|
||||
top_p: float = 0.9,
|
||||
temperature: float = 0.0,
|
||||
action_space: str = "pyautogui",
|
||||
observation_type: str = "screenshot",
|
||||
history_n: int = 4,
|
||||
add_thought_prefix: bool = False,
|
||||
coordinate_type: str = "relative",
|
||||
):
|
||||
self.platform = platform
|
||||
self.model = model
|
||||
self.max_tokens = max_tokens
|
||||
self.top_p = top_p
|
||||
self.temperature = temperature
|
||||
self.action_space = action_space
|
||||
self.observation_type = observation_type
|
||||
self.history_n = history_n
|
||||
self.add_thought_prefix = add_thought_prefix
|
||||
self.coordinate_type = coordinate_type
|
||||
|
||||
assert action_space in ["pyautogui"], "Invalid action space"
|
||||
assert observation_type in ["screenshot"], "Invalid observation type"
|
||||
|
||||
self.thoughts = []
|
||||
self.actions = []
|
||||
self.observations = []
|
||||
self.responses = []
|
||||
self.screenshots = []
|
||||
|
||||
def predict(self, instruction: str, obs: Dict) -> List:
|
||||
"""
|
||||
Predict the next action(s) based on the current observation.
|
||||
Returns (response, pyautogui_code).
|
||||
"""
|
||||
screenshot_bytes = obs["screenshot"]
|
||||
|
||||
image = Image.open(BytesIO(screenshot_bytes))
|
||||
width, height = image.size
|
||||
print(f"Original screen resolution: {width}x{height}")
|
||||
|
||||
processed_image = process_image(screenshot_bytes)
|
||||
processed_img = Image.open(
|
||||
BytesIO(base64.b64decode(processed_image))
|
||||
)
|
||||
processed_width, processed_height = processed_img.size
|
||||
print(
|
||||
"Processed image resolution: "
|
||||
f"{processed_width}x{processed_height}"
|
||||
)
|
||||
|
||||
self.screenshots.append(processed_image)
|
||||
|
||||
current_step = len(self.actions)
|
||||
history_start_idx = max(0, current_step - self.history_n)
|
||||
|
||||
previous_actions = []
|
||||
for i in range(history_start_idx):
|
||||
if i < len(self.actions):
|
||||
previous_actions.append(f"Step {i+1}: {self.actions[i]}")
|
||||
previous_actions_str = (
|
||||
"\n".join(previous_actions) if previous_actions else "None"
|
||||
)
|
||||
|
||||
description_prompt_lines = [
|
||||
"Use a mouse and keyboard to interact with a computer, and take screenshots.",
|
||||
"* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.",
|
||||
"* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.",
|
||||
(
|
||||
f"* The screen's resolution is {processed_width}x{processed_height}."
|
||||
if self.coordinate_type == "absolute"
|
||||
else "* The screen's resolution is 1000x1000."
|
||||
),
|
||||
"* Whenever you intend to move the cursor to click on an element like an icon, you should consult a screenshot to determine the coordinates of the element before moving the cursor.",
|
||||
"* If you tried clicking on a program or link but it failed to load even after waiting, try adjusting your cursor position so that the tip of the cursor visually falls on the element that you want to click.",
|
||||
"* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.",
|
||||
]
|
||||
description_prompt = "\n".join(description_prompt_lines)
|
||||
|
||||
action_description_prompt = """
|
||||
* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.
|
||||
* `type`: Type a string of text on the keyboard.
|
||||
* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.
|
||||
* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.
|
||||
* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.
|
||||
* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.
|
||||
* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.
|
||||
* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.
|
||||
* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).
|
||||
* `scroll`: Performs a scroll of the mouse scroll wheel.
|
||||
* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).
|
||||
* `wait`: Wait specified seconds for the change to happen.
|
||||
* `terminate`: Terminate the current task and report its completion status.
|
||||
* `answer`: Answer a question.
|
||||
"""
|
||||
|
||||
tools_def = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name_for_human": "computer_use",
|
||||
"name": "computer_use",
|
||||
"description": description_prompt,
|
||||
"parameters": {
|
||||
"properties": {
|
||||
"action": {
|
||||
"description": action_description_prompt,
|
||||
"enum": ["key", "type", "mouse_move", "left_click", "left_click_drag",
|
||||
"right_click", "middle_click", "double_click", "scroll", "wait", "terminate"],
|
||||
"type": "string"
|
||||
},
|
||||
"keys": {"description": "Required only by `action=key`.", "type": "array"},
|
||||
"text": {"description": "Required only by `action=type`.", "type": "string"},
|
||||
"coordinate": {"description": "The x,y coordinates for mouse actions.", "type": "array"},
|
||||
"pixels": {"description": "The amount of scrolling.", "type": "number"},
|
||||
"time": {"description": "The seconds to wait.", "type": "number"},
|
||||
"status": {
|
||||
"description": "The status of the task.",
|
||||
"type": "string",
|
||||
"enum": ["success", "failure"]
|
||||
}
|
||||
},
|
||||
"required": ["action"],
|
||||
"type": "object"
|
||||
},
|
||||
"args_format": "Format the arguments as a JSON object."
|
||||
}
|
||||
}
|
||||
|
||||
system_prompt = """# Tools
|
||||
|
||||
You may call one or more functions to assist with the user query.
|
||||
|
||||
You are provided with function signatures within <tools></tools> XML tags:
|
||||
<tools>
|
||||
""" + json.dumps(tools_def) + """
|
||||
</tools>
|
||||
|
||||
For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:
|
||||
<tool_call>
|
||||
{"name": <function-name>, "arguments": <args-json-object>}
|
||||
</tool_call>
|
||||
|
||||
# Response format
|
||||
|
||||
Response format for every step:
|
||||
1) Action: a short imperative describing what to do in the UI.
|
||||
2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}.
|
||||
|
||||
Rules:
|
||||
- Output exactly in the order: Action, <tool_call>.
|
||||
- Be brief: one sentence for Action.
|
||||
- Do not output anything else outside those parts.
|
||||
- If finishing, use action=terminate in the tool call."""
|
||||
|
||||
instruction_prompt = f"""
|
||||
Please generate the next move according to the UI screenshot, instruction and previous actions.
|
||||
|
||||
Instruction: {instruction}
|
||||
|
||||
Previous actions:
|
||||
{previous_actions_str}"""
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": [
|
||||
{"type": "text", "text": system_prompt},
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
history_len = min(self.history_n, len(self.responses))
|
||||
if history_len > 0:
|
||||
history_responses = self.responses[-history_len:]
|
||||
history_screenshots = self.screenshots[-history_len - 1:-1]
|
||||
|
||||
for idx in range(history_len):
|
||||
if idx < len(history_screenshots):
|
||||
screenshot_b64 = history_screenshots[idx]
|
||||
if idx == 0:
|
||||
img_url = f"data:image/png;base64,{screenshot_b64}"
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": img_url},
|
||||
},
|
||||
{"type": "text", "text": instruction_prompt},
|
||||
],
|
||||
}
|
||||
)
|
||||
else:
|
||||
img_url = f"data:image/png;base64,{screenshot_b64}"
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": img_url},
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
messages.append(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "text", "text": f"{history_responses[idx]}"},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
curr_img_url = f"data:image/png;base64,{processed_image}"
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": curr_img_url},
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
else:
|
||||
curr_img_url = f"data:image/png;base64,{processed_image}"
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": curr_img_url},
|
||||
},
|
||||
{"type": "text", "text": instruction_prompt},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
# Debug: save messages before sending to model
|
||||
try:
|
||||
draft_dir = "./draft/message_cache"
|
||||
os.makedirs(draft_dir, exist_ok=True)
|
||||
message_file_path = os.path.join(
|
||||
draft_dir, f"messages_step_{current_step}.json"
|
||||
)
|
||||
with open(message_file_path, "w") as f:
|
||||
json.dump(messages, f)
|
||||
except Exception as _e: # do not fail prediction due to debug IO
|
||||
pass
|
||||
|
||||
response = self.call_llm(
|
||||
{
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
"max_tokens": self.max_tokens,
|
||||
"top_p": self.top_p,
|
||||
"temperature": self.temperature,
|
||||
},
|
||||
self.model,
|
||||
)
|
||||
|
||||
logger.info(f"Qwen3VL Output: {response}")
|
||||
|
||||
self.responses.append(response)
|
||||
|
||||
low_level_instruction, pyautogui_code = self.parse_response(
|
||||
response,
|
||||
width,
|
||||
height,
|
||||
processed_width,
|
||||
processed_height,
|
||||
)
|
||||
|
||||
logger.info(f"Low level instruction: {low_level_instruction}")
|
||||
logger.info(f"Pyautogui code: {pyautogui_code}")
|
||||
|
||||
self.actions.append(low_level_instruction)
|
||||
|
||||
return response, pyautogui_code
|
||||
|
||||
def parse_response(
|
||||
self,
|
||||
response: str,
|
||||
original_width: int = None,
|
||||
original_height: int = None,
|
||||
processed_width: int = None,
|
||||
processed_height: int = None,
|
||||
) -> Tuple[str, List[str]]:
|
||||
"""
|
||||
Parse LLM response and convert it to low level action and pyautogui code.
|
||||
"""
|
||||
low_level_instruction = ""
|
||||
pyautogui_code: List[str] = []
|
||||
|
||||
if response is None or not response.strip():
|
||||
return low_level_instruction, pyautogui_code
|
||||
|
||||
def adjust_coordinates(x: float, y: float) -> Tuple[int, int]:
|
||||
if not (original_width and original_height):
|
||||
return int(x), int(y)
|
||||
if self.coordinate_type == "absolute":
|
||||
# scale from processed pixels to original
|
||||
if processed_width and processed_height:
|
||||
x_scale = original_width / processed_width
|
||||
y_scale = original_height / processed_height
|
||||
return int(x * x_scale), int(y * y_scale)
|
||||
return int(x), int(y)
|
||||
# relative: scale from 0..999 grid
|
||||
x_scale = original_width / 999
|
||||
y_scale = original_height / 999
|
||||
return int(x * x_scale), int(y * y_scale)
|
||||
|
||||
def process_tool_call(json_str: str) -> None:
|
||||
try:
|
||||
tool_call = json.loads(json_str)
|
||||
if tool_call.get("name") == "computer_use":
|
||||
args = tool_call["arguments"]
|
||||
action = args["action"]
|
||||
|
||||
if action == "left_click":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
pyautogui_code.append(f"pyautogui.click({adj_x}, {adj_y})")
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.click()")
|
||||
|
||||
elif action == "right_click":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
pyautogui_code.append(
|
||||
f"pyautogui.rightClick({adj_x}, {adj_y})"
|
||||
)
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.rightClick()")
|
||||
|
||||
elif action == "middle_click":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
pyautogui_code.append(
|
||||
f"pyautogui.middleClick({adj_x}, {adj_y})"
|
||||
)
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.middleClick()")
|
||||
|
||||
elif action == "double_click":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
pyautogui_code.append(
|
||||
f"pyautogui.doubleClick({adj_x}, {adj_y})"
|
||||
)
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.doubleClick()")
|
||||
|
||||
elif action == "type":
|
||||
text = args.get("text", "")
|
||||
pyautogui_code.append(f"pyautogui.typewrite('{text}')")
|
||||
|
||||
elif action == "key":
|
||||
keys = args.get("keys", [])
|
||||
if isinstance(keys, list):
|
||||
cleaned_keys = []
|
||||
for key in keys:
|
||||
if isinstance(key, str):
|
||||
if key.startswith("keys=["):
|
||||
key = key[6:]
|
||||
if key.endswith("]"):
|
||||
key = key[:-1]
|
||||
if key.startswith("['") or key.startswith('["'):
|
||||
key = key[2:] if len(key) > 2 else key
|
||||
if key.endswith("']") or key.endswith('"]'):
|
||||
key = key[:-2] if len(key) > 2 else key
|
||||
key = key.strip()
|
||||
cleaned_keys.append(key)
|
||||
else:
|
||||
cleaned_keys.append(key)
|
||||
keys = cleaned_keys
|
||||
|
||||
keys_str = ", ".join([f"'{key}'" for key in keys])
|
||||
if len(keys) > 1:
|
||||
pyautogui_code.append(f"pyautogui.hotkey({keys_str})")
|
||||
else:
|
||||
pyautogui_code.append(f"pyautogui.press({keys_str})")
|
||||
|
||||
elif action == "scroll":
|
||||
pixels = args.get("pixels", 0)
|
||||
pyautogui_code.append(f"pyautogui.scroll({pixels})")
|
||||
|
||||
elif action == "wait":
|
||||
pyautogui_code.append("WAIT")
|
||||
|
||||
elif action == "terminate":
|
||||
pyautogui_code.append("DONE")
|
||||
|
||||
elif action == "mouse_move":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
pyautogui_code.append(
|
||||
f"pyautogui.moveTo({adj_x}, {adj_y})"
|
||||
)
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.moveTo(0, 0)")
|
||||
|
||||
elif action == "left_click_drag":
|
||||
if "coordinate" in args:
|
||||
x, y = args["coordinate"]
|
||||
adj_x, adj_y = adjust_coordinates(x, y)
|
||||
duration = args.get("duration", 0.5)
|
||||
pyautogui_code.append(
|
||||
f"pyautogui.dragTo({adj_x}, {adj_y}, duration={duration})"
|
||||
)
|
||||
else:
|
||||
pyautogui_code.append("pyautogui.dragTo(0, 0)")
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.error(f"Failed to parse tool call: {e}")
|
||||
|
||||
lines = response.split("\n")
|
||||
inside_tool_call = False
|
||||
current_tool_call: List[str] = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.lower().startswith(("action:")):
|
||||
if not low_level_instruction:
|
||||
low_level_instruction = line.split("Action:")[-1].strip()
|
||||
continue
|
||||
|
||||
if line.startswith("<tool_call>"):
|
||||
inside_tool_call = True
|
||||
continue
|
||||
elif line.startswith("</tool_call>"):
|
||||
if current_tool_call:
|
||||
process_tool_call("\n".join(current_tool_call))
|
||||
current_tool_call = []
|
||||
inside_tool_call = False
|
||||
continue
|
||||
|
||||
if inside_tool_call:
|
||||
current_tool_call.append(line)
|
||||
continue
|
||||
|
||||
if line.startswith("{") and line.endswith("}"):
|
||||
try:
|
||||
json_obj = json.loads(line)
|
||||
if "name" in json_obj and "arguments" in json_obj:
|
||||
process_tool_call(line)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
if current_tool_call:
|
||||
process_tool_call("\n".join(current_tool_call))
|
||||
|
||||
if not low_level_instruction and len(pyautogui_code) > 0:
|
||||
action_type = pyautogui_code[0].split(".", 1)[1].split("(", 1)[0]
|
||||
low_level_instruction = f"Performing {action_type} action"
|
||||
|
||||
return low_level_instruction, pyautogui_code
|
||||
|
||||
@backoff.on_exception(
|
||||
backoff.constant,
|
||||
(
|
||||
SSLError,
|
||||
openai.RateLimitError,
|
||||
openai.BadRequestError,
|
||||
openai.InternalServerError,
|
||||
InvalidArgument,
|
||||
ResourceExhausted,
|
||||
InternalServerError,
|
||||
BadRequest,
|
||||
),
|
||||
interval=30,
|
||||
max_tries=5,
|
||||
)
|
||||
def call_llm(self, payload, model):
|
||||
messages = payload["messages"]
|
||||
|
||||
base_url = "https://poc-dashscope.aliyuncs.com/compatible-mode/v1"
|
||||
api_key = "sk-123"
|
||||
client = openai.OpenAI(base_url=base_url, api_key=api_key)
|
||||
|
||||
for _ in range(MAX_RETRY_TIMES):
|
||||
logger.info("Generating content with Qwen model: %s", model)
|
||||
try:
|
||||
response = client.chat.completions.create(
|
||||
model=model,
|
||||
messages=messages,
|
||||
max_tokens=self.max_tokens,
|
||||
temperature=self.temperature,
|
||||
top_p=self.top_p,
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
logger.error(f"Error calling Qwen model: {e}")
|
||||
time.sleep(5)
|
||||
continue
|
||||
return ""
|
||||
|
||||
def reset(self, _logger=None):
|
||||
global logger
|
||||
logger = (
|
||||
_logger if _logger is not None
|
||||
else logging.getLogger("desktopenv.qwen3vl_agent")
|
||||
)
|
||||
|
||||
self.thoughts = []
|
||||
self.action_descriptions = (
|
||||
[] if hasattr(self, "action_descriptions") else []
|
||||
)
|
||||
self.actions = []
|
||||
self.observations = []
|
||||
self.responses = []
|
||||
self.screenshots = []
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,511 @@
|
|||
from __future__ import annotations
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import time
|
||||
from typing import List
|
||||
from multiprocessing import Process, Manager
|
||||
from multiprocessing import current_process
|
||||
import lib_run_single
|
||||
from desktop_env.desktop_env import DesktopEnv
|
||||
from mm_agents.qwen3vl_agent import Qwen3VLAgent
|
||||
|
||||
# Global variables for signal handling
|
||||
active_environments = []
|
||||
processes = []
|
||||
is_terminating = False
|
||||
|
||||
# load the environment variables from .env file
|
||||
if os.path.exists(".env"):
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
|
||||
def config() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run end-to-end evaluation on the benchmark (Qwen3VL)"
|
||||
)
|
||||
|
||||
# environment config
|
||||
parser.add_argument("--path_to_vm", type=str, default=None)
|
||||
parser.add_argument(
|
||||
"--headless", action="store_true", help="Run in headless machine"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--action_space", type=str, default="pyautogui", help="Action type"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--observation_type",
|
||||
choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"],
|
||||
default="screenshot",
|
||||
help="Observation type",
|
||||
)
|
||||
parser.add_argument("--sleep_after_execution", type=float, default=0.0)
|
||||
parser.add_argument("--max_steps", type=int, default=15)
|
||||
|
||||
# agent config
|
||||
parser.add_argument("--max_trajectory_length", type=int, default=3)
|
||||
parser.add_argument(
|
||||
"--test_config_base_dir", type=str, default="evaluation_examples"
|
||||
)
|
||||
|
||||
# lm config
|
||||
parser.add_argument("--model", type=str, default="qwen3-vl")
|
||||
parser.add_argument("--temperature", type=float, default=0)
|
||||
parser.add_argument("--top_p", type=float, default=0.9)
|
||||
parser.add_argument("--max_tokens", type=int, default=1500)
|
||||
parser.add_argument("--stop_token", type=str, default=None)
|
||||
parser.add_argument(
|
||||
"--coord",
|
||||
type=str,
|
||||
choices=["absolute", "relative"],
|
||||
default="absolute",
|
||||
help="Coordinate system for agent outputs (absolute or relative)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--add_thought_prefix",
|
||||
action="store_true",
|
||||
help="Add thought prefix to the response",
|
||||
)
|
||||
|
||||
# example config
|
||||
parser.add_argument("--domain", type=str, default="all")
|
||||
parser.add_argument(
|
||||
"--test_all_meta_path", type=str, default="evaluation_examples/test_nogdrive.json"
|
||||
)
|
||||
|
||||
# logging related
|
||||
parser.add_argument("--result_dir", type=str, default="./results")
|
||||
parser.add_argument(
|
||||
"--num_envs", type=int, default=1, help="Number of environments to run in parallel"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log_level",
|
||||
type=str,
|
||||
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||||
default="INFO",
|
||||
help="Set the logging level",
|
||||
)
|
||||
|
||||
# provider config
|
||||
parser.add_argument(
|
||||
"--region", type=str, default="us-east-1", help="AWS region for the VM"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--provider_name",
|
||||
type=str,
|
||||
default="docker",
|
||||
choices=["aws", "virtualbox", "vmware", "docker", "azure"],
|
||||
help="Provider name",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--client_password", type=str, default="", help="Client password"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--screen_width", type=int, default=1920, help="Screen width"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--screen_height", type=int, default=1080, help="Screen height"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
args = config() # Get command line arguments first
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = getattr(logging, args.log_level.upper())
|
||||
logger.setLevel(log_level)
|
||||
|
||||
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
|
||||
|
||||
file_handler = logging.FileHandler(
|
||||
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||
)
|
||||
debug_handler = logging.FileHandler(
|
||||
os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
|
||||
)
|
||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||
|
||||
file_handler.setLevel(logging.INFO)
|
||||
debug_handler.setLevel(logging.DEBUG)
|
||||
stdout_handler.setLevel(log_level)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
fmt=(
|
||||
"\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s "
|
||||
"\x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] "
|
||||
"\x1b[0m%(message)s"
|
||||
)
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
debug_handler.setFormatter(formatter)
|
||||
stdout_handler.setFormatter(formatter)
|
||||
|
||||
stdout_handler.addFilter(logging.Filter("desktopenv"))
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(debug_handler)
|
||||
logger.addHandler(stdout_handler)
|
||||
|
||||
logger = logging.getLogger("desktopenv.experiment")
|
||||
|
||||
|
||||
def distribute_tasks(test_all_meta: dict) -> List[tuple]:
|
||||
all_tasks = []
|
||||
for domain, examples in test_all_meta.items():
|
||||
for example_id in examples:
|
||||
all_tasks.append((domain, example_id))
|
||||
return all_tasks
|
||||
|
||||
|
||||
def run_env_tasks(task_queue, args: argparse.Namespace, shared_scores: list):
|
||||
active_environments = []
|
||||
env = None
|
||||
try:
|
||||
REGION = args.region
|
||||
screen_size = (args.screen_width, args.screen_height)
|
||||
snapshot_name = "init_state"
|
||||
if args.provider_name == "aws":
|
||||
from desktop_env.providers.aws.manager import IMAGE_ID_MAP
|
||||
ami_id = IMAGE_ID_MAP[REGION].get(screen_size, IMAGE_ID_MAP[REGION][(1920, 1080)])
|
||||
snapshot_name = ami_id
|
||||
env = DesktopEnv(
|
||||
path_to_vm=args.path_to_vm,
|
||||
action_space=args.action_space,
|
||||
provider_name=args.provider_name,
|
||||
region=REGION,
|
||||
snapshot_name=snapshot_name,
|
||||
screen_size=screen_size,
|
||||
headless=args.headless,
|
||||
os_type="Ubuntu",
|
||||
require_a11y_tree=args.observation_type in [
|
||||
"a11y_tree",
|
||||
"screenshot_a11y_tree",
|
||||
"som",
|
||||
],
|
||||
enable_proxy=True,
|
||||
client_password=args.client_password,
|
||||
)
|
||||
active_environments.append(env)
|
||||
agent = Qwen3VLAgent(
|
||||
model=args.model,
|
||||
max_tokens=args.max_tokens,
|
||||
top_p=args.top_p,
|
||||
temperature=args.temperature,
|
||||
action_space=args.action_space,
|
||||
coordinate_type=args.coord,
|
||||
add_thought_prefix=args.add_thought_prefix,
|
||||
)
|
||||
logger.info(f"Process {current_process().name} started.")
|
||||
while True:
|
||||
try:
|
||||
item = task_queue.get(timeout=5)
|
||||
except Exception:
|
||||
break
|
||||
domain, example_id = item
|
||||
try:
|
||||
config_file = os.path.join(
|
||||
args.test_config_base_dir, f"examples/{domain}/{example_id}.json"
|
||||
)
|
||||
with open(config_file, "r", encoding="utf-8") as f:
|
||||
example = json.load(f)
|
||||
logger.info(f"[{current_process().name}][Domain]: {domain}")
|
||||
logger.info(f"[{current_process().name}][Example ID]: {example_id}")
|
||||
logger.info(f"[{current_process().name}][Instruction]: {example['instruction']}")
|
||||
example_result_dir = os.path.join(
|
||||
args.result_dir,
|
||||
args.action_space,
|
||||
args.observation_type,
|
||||
args.model,
|
||||
domain,
|
||||
example_id,
|
||||
)
|
||||
os.makedirs(example_result_dir, exist_ok=True)
|
||||
try:
|
||||
lib_run_single.run_single_example(
|
||||
agent,
|
||||
env,
|
||||
example,
|
||||
args.max_steps,
|
||||
example["instruction"],
|
||||
args,
|
||||
example_result_dir,
|
||||
shared_scores,
|
||||
)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
logger.error(f"Exception in {current_process().name} {domain}/{example_id}: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
try:
|
||||
env.controller.end_recording(
|
||||
os.path.join(example_result_dir, "recording.mp4")
|
||||
)
|
||||
except Exception as rec_e:
|
||||
logger.error(f"Failed to end recording: {rec_e}")
|
||||
with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f:
|
||||
f.write(json.dumps({"Error": f"{domain}/{example_id} - {e}"}))
|
||||
f.write("\n")
|
||||
except Exception as e:
|
||||
logger.error(f"Task-level error in {current_process().name}: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
except Exception as e:
|
||||
logger.error(f"Process-level error in {current_process().name}: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
finally:
|
||||
logger.info(f"{current_process().name} cleaning up environment...")
|
||||
try:
|
||||
if env:
|
||||
env.close()
|
||||
logger.info(f"{current_process().name} environment closed successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"{current_process().name} error during environment cleanup: {e}")
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
global is_terminating, active_environments, processes
|
||||
if is_terminating:
|
||||
return
|
||||
is_terminating = True
|
||||
logger.info(f"Received signal {signum}. Gracefully shutting down...")
|
||||
for env in active_environments:
|
||||
try:
|
||||
logger.info(f"Closing environment...")
|
||||
env.close()
|
||||
logger.info(f"Environment closed successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing environment: {e}")
|
||||
for p in processes:
|
||||
if p.is_alive():
|
||||
try:
|
||||
logger.info(f"Sending termination signal to process {p.name}...")
|
||||
p.terminate()
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending termination signal to process: {e}")
|
||||
time.sleep(1)
|
||||
for p in processes:
|
||||
if p.is_alive():
|
||||
try:
|
||||
logger.info(f"Forcefully terminating process {p.name}...")
|
||||
import signal as sig
|
||||
os.kill(p.pid, sig.SIGKILL)
|
||||
except Exception as e:
|
||||
logger.error(f"Error forcefully terminating process: {e}")
|
||||
logger.info("Shutdown complete. Exiting.")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def test(args: argparse.Namespace, test_all_meta: dict) -> None:
|
||||
global processes
|
||||
logger.info("Args: %s", args)
|
||||
all_tasks = distribute_tasks(test_all_meta)
|
||||
logger.info(f"Total tasks: {len(all_tasks)}")
|
||||
with Manager() as manager:
|
||||
shared_scores = manager.list()
|
||||
task_queue = manager.Queue()
|
||||
for item in all_tasks:
|
||||
task_queue.put(item)
|
||||
num_envs = args.num_envs
|
||||
processes = []
|
||||
for i in range(num_envs):
|
||||
p = Process(
|
||||
target=run_env_tasks,
|
||||
args=(task_queue, args, shared_scores),
|
||||
name=f"EnvProcess-{i+1}"
|
||||
)
|
||||
p.daemon = True
|
||||
p.start()
|
||||
processes.append(p)
|
||||
logger.info(f"Started process {p.name} with PID {p.pid}")
|
||||
try:
|
||||
while True:
|
||||
alive_count = 0
|
||||
for idx, p in enumerate(processes):
|
||||
if not p.is_alive():
|
||||
logger.warning(f"Process {p.name} died, restarting...")
|
||||
new_p = Process(
|
||||
target=run_env_tasks,
|
||||
args=(task_queue, args, shared_scores),
|
||||
name=f"EnvProcess-Restart-{idx+1}"
|
||||
)
|
||||
new_p.daemon = True
|
||||
new_p.start()
|
||||
processes[idx] = new_p
|
||||
logger.info(f"Restarted process {new_p.name} with PID {new_p.pid}")
|
||||
else:
|
||||
alive_count += 1
|
||||
if task_queue.empty():
|
||||
logger.info("All tasks finished.")
|
||||
break
|
||||
if alive_count == 0:
|
||||
logger.error("All processes died, exiting.")
|
||||
break
|
||||
time.sleep(5)
|
||||
for p in processes:
|
||||
p.join()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Main process received KeyboardInterrupt. Initiating graceful shutdown...")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error while waiting for processes: {e}", exc_info=True)
|
||||
for p in processes:
|
||||
if p.is_alive():
|
||||
try:
|
||||
logger.info(f"Terminating process {p.name} due to error...")
|
||||
p.terminate()
|
||||
except Exception as term_e:
|
||||
logger.error(f"Error terminating process {p.name}: {term_e}")
|
||||
raise
|
||||
scores = list(shared_scores)
|
||||
logger.info(f"Average score: {sum(scores) / len(scores) if scores else 0}")
|
||||
|
||||
|
||||
def get_unfinished(
|
||||
action_space, use_model, observation_type, result_dir, total_file_json
|
||||
):
|
||||
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
|
||||
|
||||
if not os.path.exists(target_dir):
|
||||
return total_file_json
|
||||
|
||||
finished = {}
|
||||
for domain in os.listdir(target_dir):
|
||||
finished[domain] = []
|
||||
domain_path = os.path.join(target_dir, domain)
|
||||
if os.path.isdir(domain_path):
|
||||
for example_id in os.listdir(domain_path):
|
||||
if example_id == "onboard":
|
||||
continue
|
||||
example_path = os.path.join(domain_path, example_id)
|
||||
if os.path.isdir(example_path):
|
||||
if "result.txt" not in os.listdir(example_path):
|
||||
for file in os.listdir(example_path):
|
||||
os.remove(os.path.join(example_path, file))
|
||||
else:
|
||||
finished[domain].append(example_id)
|
||||
|
||||
if not finished:
|
||||
return total_file_json
|
||||
|
||||
for domain, examples in finished.items():
|
||||
if domain in total_file_json:
|
||||
total_file_json[domain] = [
|
||||
x for x in total_file_json[domain] if x not in examples
|
||||
]
|
||||
|
||||
return total_file_json
|
||||
|
||||
|
||||
def get_result(action_space, use_model, observation_type, result_dir, total_file_json):
|
||||
target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
|
||||
if not os.path.exists(target_dir):
|
||||
print("New experiment, no result yet.")
|
||||
return None
|
||||
|
||||
all_result = []
|
||||
|
||||
for domain in os.listdir(target_dir):
|
||||
domain_path = os.path.join(target_dir, domain)
|
||||
if os.path.isdir(domain_path):
|
||||
for example_id in os.listdir(domain_path):
|
||||
example_path = os.path.join(domain_path, example_id)
|
||||
if os.path.isdir(example_path):
|
||||
if "result.txt" in os.listdir(example_path):
|
||||
try:
|
||||
value_str = open(
|
||||
os.path.join(example_path, "result.txt"), "r"
|
||||
).read()
|
||||
all_result.append(float(value_str))
|
||||
except Exception:
|
||||
all_result.append(0.0)
|
||||
|
||||
if not all_result:
|
||||
print("New experiment, no result yet.")
|
||||
return None
|
||||
else:
|
||||
print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%")
|
||||
return all_result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
try:
|
||||
args = config()
|
||||
path_to_args = os.path.join(
|
||||
args.result_dir,
|
||||
args.action_space,
|
||||
args.observation_type,
|
||||
args.model,
|
||||
"args.json",
|
||||
)
|
||||
os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
|
||||
with open(path_to_args, "w", encoding="utf-8") as f:
|
||||
json.dump(vars(args), f, indent=4)
|
||||
|
||||
with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
|
||||
test_all_meta = json.load(f)
|
||||
|
||||
if args.domain != "all":
|
||||
test_all_meta = {args.domain: test_all_meta[args.domain]}
|
||||
|
||||
test_file_list = get_unfinished(
|
||||
args.action_space,
|
||||
args.model,
|
||||
args.observation_type,
|
||||
args.result_dir,
|
||||
test_all_meta,
|
||||
)
|
||||
left_info = ""
|
||||
for domain in test_file_list:
|
||||
left_info += f"{domain}: {len(test_file_list[domain])}\n"
|
||||
logger.info(f"Left tasks:\n{left_info}")
|
||||
|
||||
get_result(
|
||||
args.action_space,
|
||||
args.model,
|
||||
args.observation_type,
|
||||
args.result_dir,
|
||||
test_all_meta,
|
||||
)
|
||||
test(args, test_file_list)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Main process received KeyboardInterrupt.")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in main process: {e}", exc_info=True)
|
||||
signal_handler(signal.SIGTERM, None)
|
||||
finally:
|
||||
logger.info("Main process final cleanup...")
|
||||
for env in active_environments:
|
||||
if env is not None:
|
||||
try:
|
||||
logger.info("Closing environment in final cleanup...")
|
||||
env.close()
|
||||
logger.info("Environment closed successfully in final cleanup")
|
||||
except Exception as e:
|
||||
logger.error(f"Error during final environment cleanup: {e}")
|
||||
for p in processes:
|
||||
if p is not None and p.is_alive():
|
||||
try:
|
||||
logger.info(f"Terminating process {p.name}...")
|
||||
p.terminate()
|
||||
except Exception as e:
|
||||
logger.error(f"Error terminating process: {e}")
|
||||
time.sleep(1)
|
||||
for p in processes:
|
||||
if p is not None and p.is_alive():
|
||||
try:
|
||||
logger.info(f"Force killing process {p.name}...")
|
||||
os.kill(p.pid, signal.SIGKILL)
|
||||
logger.info(f"Process {p.name} force killed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error force killing process: {e}")
|
||||
|
||||
|
||||
Loading…
Reference in New Issue