986 lines
44 KiB
Python
986 lines
44 KiB
Python
"""
|
|
Simple Baseline Agent for Reasoning Experiments
|
|
|
|
This is a very basic agent that:
|
|
1. Reads operator documentation
|
|
2. Looks at sample data
|
|
3. Generates a pipeline of operators using tool calls
|
|
4. Executes the pipeline and returns results
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import yaml
|
|
import argparse
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Literal
|
|
from pydantic import BaseModel, Field
|
|
from litellm import completion
|
|
from docetl.runner import DSLRunner
|
|
from experiments.reasoning.evaluation.utils import run_dataset_evaluation, get_evaluate_func, dataset_accuracy_metrics
|
|
from docetl.utils_evaluation import load_custom_evaluate_func as docetl_load_custom_evaluate_func
|
|
import modal
|
|
from experiments.reasoning.utils import app, volume, VOLUME_MOUNT_PATH, image
|
|
from experiments.reasoning.evaluation.utils import dataset_accuracy_metrics
|
|
|
|
|
|
|
|
DEFAULT_MODEL = "gpt-5"
|
|
DEFAULT_OUTPUT_DIR = "outputs/simple_agent"
|
|
|
|
class AgentAction(BaseModel):
|
|
"""Schema for agent action decisions."""
|
|
action: Literal["try_pipeline", "return_pipeline"] = Field(
|
|
..., description="The action to take"
|
|
)
|
|
reasoning: str = Field(
|
|
..., description="Explanation of why this action was chosen"
|
|
)
|
|
|
|
class PathResolver:
|
|
"""Handles path resolution for local and Modal environments."""
|
|
|
|
@staticmethod
|
|
def resolve_in_volume(path: str | None) -> str | None:
|
|
if path is None:
|
|
return None
|
|
p = Path(path)
|
|
if p.is_absolute():
|
|
return str(p)
|
|
return str((Path(VOLUME_MOUNT_PATH) / p).resolve())
|
|
|
|
@staticmethod
|
|
def get_data_path(dataset: str) -> Path:
|
|
"""Get the training data path for a dataset."""
|
|
path_map = {
|
|
"cuad": "experiments/reasoning/data/train/cuad.json",
|
|
"blackvault": "experiments/reasoning/data/train/blackvault.json",
|
|
"game_reviews": "experiments/reasoning/data/train/game_reviews.json",
|
|
"sustainability": "experiments/reasoning/data/train/sustainability.json",
|
|
"biodex": "experiments/reasoning/data/train/biodex.json",
|
|
"medec": "experiments/reasoning/data/train/medec.json",
|
|
"facility": "experiments/reasoning/data/train/facility.json"
|
|
}
|
|
|
|
data_path = Path(path_map.get(dataset.lower(), f"experiments/reasoning/data/train/{dataset.lower()}.json"))
|
|
|
|
# Try volume mount if original doesn't exist
|
|
if not data_path.exists():
|
|
data_path = Path(VOLUME_MOUNT_PATH) / data_path
|
|
|
|
return data_path
|
|
|
|
class PipelineExecutor:
|
|
"""Handles pipeline creation and execution."""
|
|
|
|
def __init__(self, experiment_dir: Path):
|
|
self.experiment_dir = experiment_dir
|
|
|
|
def create_yaml(self, operators: List[Dict], dataset: str, prefix: str = "pipeline") -> str:
|
|
dataset_file = f"experiments/reasoning/data/train/{dataset.lower()}.json"
|
|
output_json = self.experiment_dir / f"{prefix}_output.json"
|
|
|
|
# Add azure/ or gemini/ prefix to models
|
|
for op in operators:
|
|
if "model" in op and "gpt" in str(op["model"]) and not str(op["model"]).startswith("azure/"):
|
|
op["model"] = "azure/" + str(op["model"])
|
|
if "model" in op and "gemini" in str(op["model"]) and not str(op["model"]).startswith("gemini/"):
|
|
op["model"] = "gemini/" + str(op["model"])
|
|
|
|
|
|
config = {
|
|
"bypass_cache": True,
|
|
"datasets": {
|
|
"input_data": {"type": "file", "path": dataset_file}
|
|
},
|
|
"default_model": "azure/gpt-4o-mini",
|
|
"operations": operators,
|
|
"pipeline": {
|
|
"steps": [{
|
|
"name": f"{dataset}_pipeline",
|
|
"input": "input_data",
|
|
"operations": [op["name"] for op in operators]
|
|
}],
|
|
"output": {"type": "file", "path": str(output_json)}
|
|
}
|
|
}
|
|
|
|
yaml_path = self.experiment_dir / f"{prefix}.yaml"
|
|
with open(yaml_path, 'w') as f:
|
|
yaml.dump(config, f, sort_keys=False)
|
|
|
|
return str(yaml_path)
|
|
|
|
def rewrite_for_modal(self, yaml_path: str) -> str:
|
|
"""Rewrite pipeline YAML for Modal volume mounting."""
|
|
if not Path(VOLUME_MOUNT_PATH).exists():
|
|
return yaml_path
|
|
|
|
with open(yaml_path, "r") as f:
|
|
cfg = yaml.safe_load(f)
|
|
|
|
base_mount = Path(VOLUME_MOUNT_PATH)
|
|
output_root = base_mount / "outputs" / self.experiment_dir.name
|
|
|
|
# Update output paths
|
|
pipeline_cfg = cfg.get("pipeline", {})
|
|
if isinstance(pipeline_cfg, dict):
|
|
out = pipeline_cfg.get("output")
|
|
if isinstance(out, dict) and isinstance(out.get("path"), str):
|
|
original_name = Path(out["path"]).name
|
|
output_root.mkdir(parents=True, exist_ok=True)
|
|
out["path"] = str(output_root / original_name)
|
|
if isinstance(out.get("intermediate_dir"), str):
|
|
out["intermediate_dir"] = str(output_root / "intermediates")
|
|
|
|
# Save rewritten YAML
|
|
tmp_dir = base_mount / "tmp"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
new_yaml_path = tmp_dir / f"{Path(yaml_path).stem}_modal.yaml"
|
|
with open(new_yaml_path, "w") as f:
|
|
yaml.safe_dump(cfg, f, sort_keys=False)
|
|
|
|
return str(new_yaml_path)
|
|
|
|
def execute(self, operators: List[Dict], dataset: str, prefix: str = "test_pipeline") -> Dict[str, Any]:
|
|
"""Execute a pipeline and return results."""
|
|
try:
|
|
yaml_path = self.create_yaml(operators, dataset, prefix)
|
|
yaml_path = self.rewrite_for_modal(yaml_path)
|
|
|
|
runner = DSLRunner.from_yaml(yaml_path)
|
|
runner.load()
|
|
|
|
if runner.last_op_container:
|
|
data, _, _ = runner.last_op_container.next()
|
|
runner.save(data)
|
|
|
|
# Get output path and sample outputs
|
|
with open(yaml_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
output_path = config['pipeline']['output']['path']
|
|
|
|
sample_outputs = []
|
|
try:
|
|
with open(output_path, 'r') as f:
|
|
outputs = json.load(f)
|
|
sample_outputs = outputs[:3] if isinstance(outputs, list) else [outputs]
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"cost": runner.total_cost,
|
|
"sample_outputs": sample_outputs,
|
|
"error": None,
|
|
"output_path": output_path,
|
|
"yaml_path": yaml_path
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"cost": 0.0,
|
|
"sample_outputs": [],
|
|
"error": str(e),
|
|
"output_path": None,
|
|
"yaml_path": yaml_path if 'yaml_path' in locals() else None
|
|
}
|
|
|
|
class DataAnalyzer:
|
|
"""Handles data analysis and documentation loading."""
|
|
|
|
@staticmethod
|
|
def load_documentation(doc_path: str = "experiments/reasoning/data/operators_documentation.txt") -> str:
|
|
"""Load operator documentation."""
|
|
doc_file = Path(doc_path)
|
|
if not doc_file.exists():
|
|
doc_file = Path(VOLUME_MOUNT_PATH) / doc_path
|
|
|
|
with open(doc_file, 'r') as f:
|
|
return f.read()
|
|
|
|
@staticmethod
|
|
def load_sample_data(dataset: str, limit: int = 5) -> List[Dict]:
|
|
"""Load sample data for analysis."""
|
|
data_path = PathResolver.get_data_path(dataset)
|
|
|
|
if not data_path.exists():
|
|
return []
|
|
|
|
try:
|
|
with open(data_path, 'r') as f:
|
|
data = json.load(f)
|
|
return data[:limit] if isinstance(data, list) else [data]
|
|
except Exception:
|
|
return []
|
|
|
|
@staticmethod
|
|
def analyze_sample_data(sample_data: List[Dict]) -> str:
|
|
"""Analyze sample data structure."""
|
|
if not sample_data:
|
|
return "No sample data provided"
|
|
|
|
first_sample = sample_data[0]
|
|
analysis = [
|
|
f"Number of samples: {len(sample_data)}",
|
|
f"Keys in first sample: {list(first_sample.keys())}"
|
|
]
|
|
|
|
# Analyze field types
|
|
for key, value in first_sample.items():
|
|
if isinstance(value, str):
|
|
analysis.append(f"- {key}: string (length: {len(value)})")
|
|
elif isinstance(value, list):
|
|
analysis.append(f"- {key}: list (length: {len(value)})")
|
|
elif isinstance(value, dict):
|
|
analysis.append(f"- {key}: dict (keys: {list(value.keys())})")
|
|
else:
|
|
analysis.append(f"- {key}: {type(value).__name__}")
|
|
|
|
# Add truncated sample
|
|
sample_json = json.dumps(first_sample, indent=2)
|
|
if len(sample_json) > 2000:
|
|
sample_json = sample_json[:2000] + "\n... (truncated)"
|
|
analysis.append(f"\nFirst sample content:\n{sample_json}")
|
|
|
|
return "\n".join(analysis)
|
|
|
|
class AgentCommunicator:
|
|
"""Handles LLM communication and JSON parsing."""
|
|
|
|
def __init__(self, model: str, agent=None):
|
|
self.model = model
|
|
self.agent = agent # Reference to parent agent for cost tracking
|
|
|
|
def safe_json_parse(self, response_content: str, fallback: Dict = None) -> Dict:
|
|
"""Safely parse JSON response with fallback."""
|
|
try:
|
|
return json.loads(response_content)
|
|
except Exception:
|
|
return fallback or {}
|
|
|
|
def get_action_decision(self, messages: List[Dict]) -> Optional[AgentAction]:
|
|
"""Get agent action decision."""
|
|
try:
|
|
response = completion(
|
|
model=self.model,
|
|
messages=messages,
|
|
response_format={"type": "json_object"}
|
|
)
|
|
|
|
if self.agent and hasattr(response, '_hidden_params') and 'response_cost' in response._hidden_params:
|
|
call_cost = response._hidden_params["response_cost"]
|
|
print(f"💰 Adding LLM call cost: ${call_cost:.4f} (total before: ${self.agent.total_search_cost:.4f})")
|
|
self.agent.total_search_cost += call_cost
|
|
|
|
decision_json = self.safe_json_parse(response.choices[0].message.content)
|
|
return AgentAction(**decision_json)
|
|
except Exception:
|
|
return None
|
|
|
|
def get_operators(self, messages: List[Dict], request_msg: str) -> List[Dict]:
|
|
"""Get operators from agent."""
|
|
try:
|
|
response = completion(
|
|
model=self.model,
|
|
messages=messages + [{"role": "user", "content": request_msg}],
|
|
response_format={"type": "json_object"}
|
|
)
|
|
|
|
if self.agent and hasattr(response, '_hidden_params') and 'response_cost' in response._hidden_params:
|
|
call_cost = response._hidden_params["response_cost"]
|
|
print(f"💰 Adding LLM call cost: ${call_cost:.4f} (total before: ${self.agent.total_search_cost:.4f})")
|
|
self.agent.total_search_cost += call_cost
|
|
|
|
result = self.safe_json_parse(response.choices[0].message.content)
|
|
return result.get("operators", [])
|
|
except Exception:
|
|
return []
|
|
|
|
class SimpleBaselineAgent:
|
|
"""Simplified baseline agent that uses tool calling to generate pipelines."""
|
|
|
|
def __init__(self, model: str = DEFAULT_MODEL, available_models: List[str] = None,
|
|
custom_evaluate_func=None, custom_metric_key: str = None):
|
|
self.model = model
|
|
self.communicator = AgentCommunicator(model, self)
|
|
self.documentation = None
|
|
self.original_config = None
|
|
self.total_search_cost = 0.0
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.custom_evaluate_func = custom_evaluate_func
|
|
self.custom_metric_key = custom_metric_key
|
|
|
|
# Set available models with default if not provided
|
|
if available_models is None:
|
|
self.available_models = [
|
|
"gpt-5",
|
|
"gpt-5-mini",
|
|
"gpt-5-nano",
|
|
"gpt-4.1",
|
|
"gpt-4.1-mini",
|
|
"gpt-4.1-nano",
|
|
"gpt-4o",
|
|
"gpt-4o-mini",
|
|
"gemini-2.5-pro",
|
|
"gemini-2.5-flash",
|
|
"gemini-2.5-flash-lite"
|
|
]
|
|
else:
|
|
self.available_models = available_models
|
|
|
|
def load_resources(self, dataset: str, yaml_path: str = None):
|
|
"""Load documentation and original pipeline."""
|
|
self.documentation = DataAnalyzer.load_documentation()
|
|
|
|
# Load original pipeline
|
|
if yaml_path:
|
|
pipeline_path = Path(yaml_path)
|
|
else:
|
|
pipeline_path = Path(f"experiments/reasoning/pipelines/{dataset.lower()}.yaml")
|
|
if not pipeline_path.exists():
|
|
pipeline_path = Path(VOLUME_MOUNT_PATH) / f"experiments/reasoning/pipelines/{dataset.lower()}.yaml"
|
|
|
|
if pipeline_path.exists():
|
|
with open(pipeline_path, 'r') as f:
|
|
self.original_yaml = f.read()
|
|
self.original_config = yaml.safe_load(self.original_yaml)
|
|
|
|
def _run_individual_evaluation(self, dataset: str, output_path: str, iteration_id: int) -> Dict[str, Any]:
|
|
"""Run evaluation for a single pipeline output and return formatted metrics."""
|
|
try:
|
|
# Use custom evaluation function if provided, otherwise use default
|
|
if self.custom_evaluate_func is not None:
|
|
evaluate_func = self.custom_evaluate_func
|
|
metric_key = self.custom_metric_key
|
|
else:
|
|
evaluate_func = get_evaluate_func(dataset)
|
|
metric_key = dataset_accuracy_metrics.get(dataset.lower(), "accuracy")
|
|
|
|
if not evaluate_func:
|
|
return {"accuracy_msg": "No evaluation function available", "accuracy_val": None, "eval_metrics": {}}
|
|
|
|
# Resolve output path for Modal environment if needed
|
|
resolved_output_path = output_path
|
|
if not Path(output_path).exists() and Path(VOLUME_MOUNT_PATH).exists():
|
|
# Try to find the file in the volume mount
|
|
potential_path = Path(VOLUME_MOUNT_PATH) / Path(output_path).name
|
|
if potential_path.exists():
|
|
resolved_output_path = str(potential_path)
|
|
|
|
# Call the evaluation function with proper parameters
|
|
eval_metrics = evaluate_func(f"simple_agent_iter_{iteration_id}", resolved_output_path)
|
|
|
|
accuracy_val = eval_metrics.get(metric_key, 0.0) if metric_key else next((v for v in eval_metrics.values() if isinstance(v, (int, float))), 0.0)
|
|
|
|
# Format accuracy message based on dataset
|
|
if dataset.lower() == "cuad":
|
|
accuracy_msg = f"F1: {accuracy_val:.4f}, Precision: {eval_metrics.get('avg_precision', 0):.4f}, Recall: {eval_metrics.get('avg_recall', 0):.4f}"
|
|
elif dataset.lower() == "blackvault":
|
|
accuracy_msg = f"Avg Distinct Locations: {accuracy_val:.4f}, Total Docs: {eval_metrics.get('total_documents', 0)}"
|
|
elif dataset.lower() == "biodex":
|
|
accuracy_msg = f"Avg RP@10: {accuracy_val:.4f}, Avg RP@5: {eval_metrics.get('avg_rp_at_5', 0):.4f}, Term Recall: {eval_metrics.get('avg_term_recall', 0):.4f}"
|
|
elif dataset.lower() == "sustainability":
|
|
accuracy_msg = f"Economic Activity Acc: {accuracy_val:.4f}, Company Name Acc: {eval_metrics.get('company_name_accuracy', 0):.4f}"
|
|
elif dataset.lower() == "facility":
|
|
accuracy_msg = f"Combined Score: {accuracy_val:.4f}, Urgency: {eval_metrics.get('urgency_accuracy', 0):.4f}, Sentiment: {eval_metrics.get('sentiment_accuracy', 0):.4f}, Categories: {eval_metrics.get('categories_accuracy', 0):.4f}"
|
|
else:
|
|
accuracy_msg = f"Accuracy: {accuracy_val:.4f}"
|
|
|
|
return {
|
|
"accuracy_msg": accuracy_msg,
|
|
"accuracy_val": accuracy_val,
|
|
"eval_metrics": eval_metrics
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"❌ Evaluation error: {e}")
|
|
return {
|
|
"accuracy_msg": f"Evaluation failed: {str(e)}",
|
|
"accuracy_val": None,
|
|
"eval_metrics": {}
|
|
}
|
|
|
|
def create_system_prompt(self, dataset: str, baseline_cost: float = None, baseline_accuracy: float = None) -> str:
|
|
"""Create system prompt for the agent."""
|
|
|
|
# Define evaluation metrics explanation for each dataset
|
|
dataset_metrics_info = {
|
|
"cuad": "F1 score, Precision, and Recall for legal clause extraction",
|
|
"blackvault": "Average distinct locations per document for UFO sighting analysis",
|
|
"biodex": "Rank Precision at 5/10 and Term Recall for biochemical reaction prediction",
|
|
"sustainability": "Economic activity accuracy and company name accuracy for sustainability analysis",
|
|
"game_reviews": "Combined accuracy score for game review sentiment analysis",
|
|
"medec": "Combined score for medical entity classification",
|
|
"facility": "Combined score (urgency, sentiment, and categories accuracy) for facility support message classification"
|
|
}
|
|
|
|
metrics_info = dataset_metrics_info.get(dataset.lower(), "Accuracy metrics specific to the dataset")
|
|
|
|
return f"""You are a pipeline optimization agent that improves DocETL data processing pipelines.
|
|
|
|
You must always respond with valid JSON. You have access to the following actions:
|
|
1. try_pipeline: Test a pipeline configuration and see the results (cost, accuracy, and sample outputs)
|
|
2. return_pipeline: Return the final optimized pipeline configuration
|
|
|
|
Always respond in JSON format with:
|
|
{{"action": "try_pipeline", "reasoning": "explanation of why you want to test this pipeline"}}
|
|
OR
|
|
{{"action": "return_pipeline", "reasoning": "explanation of why this is the final pipeline"}}
|
|
|
|
When asked for operators, respond with:
|
|
{{"operators": [list of operator dictionaries]}}
|
|
|
|
AVAILABLE OPERATORS DOCUMENTATION:
|
|
{self.documentation}
|
|
|
|
CURRENT PIPELINE (baseline to improve upon):
|
|
{self.original_yaml}
|
|
|
|
BASELINE RESULTS:
|
|
- Cost: ${baseline_cost if baseline_cost is not None else 'N/A'}
|
|
- Accuracy: {baseline_accuracy if baseline_accuracy is not None else 'N/A'}
|
|
|
|
EVALUATION METRICS:
|
|
Your pipeline results will be evaluated using: {metrics_info}
|
|
Each pipeline test will provide detailed evaluation metrics to help you optimize.
|
|
|
|
YOUR TASK: Improve the pipeline's accuracy by optimizing operators, prompts, models, or adding new operations.
|
|
|
|
OPTIMIZATION STRATEGIES:
|
|
1. **Prompt Engineering**: Refine operator prompts for better extraction/classification
|
|
2. **Model Selection**: Try different models from the available list for better performance
|
|
3. **Operator Addition**: Add preprocessing, filtering, or post-processing operators
|
|
4. **Jinja Templating**: Use flexible templating to read more/less context from documents
|
|
|
|
AVAILABLE MODELS (use with 'azure/' or 'gemini/' prefix as appropriate):
|
|
{chr(10).join(f" - azure/{model}" if model.startswith("gpt") else f" - gemini/{model}" if model.startswith("gemini") else f" - {model}" for model in self.available_models)}
|
|
|
|
Your goal is to beat the baseline accuracy of {baseline_accuracy if baseline_accuracy is not None else 'N/A'}."""
|
|
|
|
def run_agent_loop(self, dataset: str, experiment_dir: Path, ground_truth_path: str = None,
|
|
baseline_cost: float = None, baseline_accuracy: float = None,
|
|
baseline_operators: List[Dict] = None, max_iterations: int = 10,
|
|
all_iteration_results: Optional[List[Dict]] = None, iteration_counter: int = 1) -> tuple[List[Dict], int]:
|
|
"""Run the agent optimization loop."""
|
|
|
|
sample_data = DataAnalyzer.load_sample_data(dataset)
|
|
data_analysis = DataAnalyzer.analyze_sample_data(sample_data)
|
|
executor = PipelineExecutor(experiment_dir)
|
|
|
|
# Initialize conversation
|
|
messages = [
|
|
{"role": "system", "content": self.create_system_prompt(dataset, baseline_cost, baseline_accuracy)},
|
|
{"role": "user", "content": f"""Dataset: {dataset}
|
|
|
|
DATA ANALYSIS:
|
|
{data_analysis}
|
|
|
|
Task: Generate a pipeline to process this {dataset} data. You can see the original pipeline YAML above as a baseline.
|
|
|
|
You should:
|
|
1. First try the original pipeline configuration to establish a baseline
|
|
2. Then try to improve it if possible
|
|
3. Return your best pipeline configuration
|
|
|
|
Start by trying the original pipeline."""}
|
|
]
|
|
|
|
# Track best results
|
|
best_pipeline = baseline_operators or []
|
|
best_cost = baseline_cost or float('inf')
|
|
best_accuracy = baseline_accuracy or -1.0
|
|
|
|
for iteration in range(max_iterations):
|
|
print(f"\n🔄 Agent iteration {iteration + 1}/{max_iterations}")
|
|
|
|
# Get agent decision
|
|
decision = self.communicator.get_action_decision(messages)
|
|
if not decision:
|
|
continue
|
|
|
|
# Print agent's decision and reasoning
|
|
print(f"🤖 Agent Decision: {decision.action}")
|
|
print(f"💭 Reasoning: {decision.reasoning}")
|
|
|
|
messages.append({"role": "assistant", "content": json.dumps(decision.model_dump())})
|
|
|
|
if decision.action == "try_pipeline":
|
|
operators = self.communicator.get_operators(
|
|
messages,
|
|
"Provide the operators for the pipeline you want to try. Return JSON in format: {\"operators\": [...]}"
|
|
)
|
|
|
|
if not operators and self.original_config:
|
|
operators = self.original_config.get('operations', [])
|
|
|
|
# Execute and evaluate
|
|
result = self._test_pipeline(operators, dataset, executor, iteration_counter,
|
|
ground_truth_path, experiment_dir, all_iteration_results)
|
|
iteration_counter += 1
|
|
|
|
# Update best pipeline
|
|
if result["success"] and result.get("accuracy_val") is not None:
|
|
if result["accuracy_val"] > best_accuracy + 1e-6:
|
|
best_accuracy = result["accuracy_val"]
|
|
best_cost = result["cost"]
|
|
best_pipeline = operators
|
|
|
|
# Add result to conversation
|
|
formatted_result = self._format_test_result(result)
|
|
messages.append({"role": "user", "content": formatted_result})
|
|
|
|
# Print iteration results
|
|
print(f"\n📋 Iteration {iteration + 1} Results:")
|
|
formatted_result = self._format_test_result(result)
|
|
print(formatted_result)
|
|
|
|
elif decision.action == "return_pipeline":
|
|
final_operators = self.communicator.get_operators(
|
|
messages,
|
|
"Provide the final operators for the pipeline you want to return. Return JSON in format: {\"operators\": [...]}"
|
|
)
|
|
|
|
return final_operators or best_pipeline, iteration_counter
|
|
|
|
return best_pipeline or (self.original_config.get('operations', []) if self.original_config else []), iteration_counter
|
|
|
|
def _test_pipeline(self, operators: List[Dict], dataset: str, executor: PipelineExecutor,
|
|
iteration_id: int, ground_truth_path: str, experiment_dir: Path,
|
|
all_iteration_results: List[Dict]) -> Dict[str, Any]:
|
|
"""Test a pipeline and return results with evaluation."""
|
|
|
|
print(f"🧪 Testing pipeline with {len(operators)} operators...")
|
|
|
|
test_result = executor.execute(operators, dataset, f"iteration_{iteration_id}")
|
|
|
|
# Initialize evaluation results
|
|
accuracy_msg = "N/A"
|
|
accuracy_val = None
|
|
eval_metrics = {}
|
|
|
|
# Run evaluation immediately after pipeline execution
|
|
if test_result["success"] and test_result.get("output_path"):
|
|
eval_result = self._run_individual_evaluation(dataset, test_result["output_path"], iteration_id)
|
|
accuracy_msg = eval_result["accuracy_msg"]
|
|
accuracy_val = eval_result["accuracy_val"]
|
|
eval_metrics = eval_result["eval_metrics"]
|
|
|
|
# Add to iteration results for batch evaluation later
|
|
if all_iteration_results is not None:
|
|
all_iteration_results.append({
|
|
"file_path": test_result["output_path"],
|
|
"cost": test_result["cost"],
|
|
"node_id": str(iteration_id)
|
|
})
|
|
|
|
return {
|
|
**test_result,
|
|
"accuracy_msg": accuracy_msg,
|
|
"accuracy_val": accuracy_val,
|
|
"eval_metrics": eval_metrics
|
|
}
|
|
|
|
def _format_test_result(self, result: Dict) -> str:
|
|
"""Format test result for agent feedback."""
|
|
eval_metrics = result.get('eval_metrics', {})
|
|
sample_outputs = result.get('sample_outputs', [])
|
|
|
|
# Base message
|
|
message_parts = [
|
|
f"Pipeline test results:",
|
|
f"- Success: {result['success']}",
|
|
f"- Cost: ${result['cost']:.4f}",
|
|
f"- Accuracy: {result['accuracy_msg']}",
|
|
f"- Error: {result.get('error', 'None')}"
|
|
]
|
|
|
|
# Add sample outputs with actual content
|
|
if sample_outputs:
|
|
message_parts.append(f"- Sample outputs ({len(sample_outputs)} items):")
|
|
for i, output in enumerate(sample_outputs[:3]): # Show up to 3 samples
|
|
if isinstance(output, dict):
|
|
# Truncate the output for readability
|
|
output_str = json.dumps(output, indent=2)
|
|
if len(output_str) > 800:
|
|
output_str = output_str[:800] + "\n... (truncated)"
|
|
message_parts.append(f" Sample {i+1}:")
|
|
# Indent each line of the output
|
|
for line in output_str.split('\n'):
|
|
message_parts.append(f" {line}")
|
|
else:
|
|
# Handle non-dict outputs
|
|
output_str = str(output)
|
|
if len(output_str) > 400:
|
|
output_str = output_str[:400] + "... (truncated)"
|
|
message_parts.append(f" Sample {i+1}: {output_str}")
|
|
else:
|
|
message_parts.append("- Sample outputs: No outputs generated")
|
|
|
|
# Add detailed evaluation metrics if available
|
|
if eval_metrics:
|
|
message_parts.append("\nDetailed Evaluation Metrics:")
|
|
for key, value in eval_metrics.items():
|
|
if isinstance(value, (int, float)):
|
|
if key.startswith('avg_') or 'accuracy' in key.lower() or 'precision' in key.lower() or 'recall' in key.lower():
|
|
message_parts.append(f"- {key}: {value:.4f}")
|
|
else:
|
|
message_parts.append(f"- {key}: {value}")
|
|
elif isinstance(value, list) and len(value) <= 5:
|
|
message_parts.append(f"- {key}: {value}")
|
|
elif not isinstance(value, (list, dict)):
|
|
message_parts.append(f"- {key}: {value}")
|
|
|
|
message_parts.append("\nBased on these results, you can either try another pipeline configuration or return the best one you've found.")
|
|
|
|
return "\n".join(message_parts)
|
|
|
|
def run_simple_agent_experiment(dataset: str, output_dir: str = None, model: str = DEFAULT_MODEL,
|
|
experiment_name: str = None, ground_truth_path: str = None,
|
|
original_query_result: Dict[str, Any] | None = None,
|
|
yaml_path: str = None, available_models: List[str] | None = None,
|
|
accuracy_function: str | None = None, accuracy_metric_key: str | None = None) -> Dict[str, Any]:
|
|
"""Run the simple agent experiment for a dataset."""
|
|
|
|
if output_dir is None:
|
|
output_dir = os.environ.get('EXPERIMENT_OUTPUT_DIR', DEFAULT_OUTPUT_DIR)
|
|
if experiment_name is None:
|
|
experiment_name = f"simple_agent_{dataset}"
|
|
|
|
exp_dir = Path(output_dir) / experiment_name
|
|
exp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"🚀 Running Simple Baseline Experiment")
|
|
print(f"Dataset: {dataset}, Model: {model}, Output: {exp_dir}")
|
|
|
|
# Load custom accuracy function if provided
|
|
custom_evaluate_func = None
|
|
if accuracy_function:
|
|
if not accuracy_metric_key:
|
|
raise ValueError("--accuracy_metric_key must be provided when using --accuracy_function")
|
|
print(f"📊 Loading custom accuracy function from: {accuracy_function}")
|
|
|
|
# Infer dataset path from yaml_path or use a default
|
|
dataset_file_path = None
|
|
if yaml_path:
|
|
# Try to infer dataset path from yaml config
|
|
try:
|
|
with open(yaml_path, 'r') as f:
|
|
yaml_config = yaml.safe_load(f)
|
|
datasets = yaml_config.get('datasets', {})
|
|
if datasets:
|
|
# Get first dataset path
|
|
first_dataset = next(iter(datasets.values()))
|
|
if isinstance(first_dataset, dict) and 'path' in first_dataset:
|
|
dataset_file_path = first_dataset['path']
|
|
except Exception:
|
|
pass
|
|
|
|
# If we couldn't infer, try to construct from dataset name
|
|
if not dataset_file_path:
|
|
# Try common paths for experiment datasets
|
|
potential_paths = [
|
|
f"experiments/reasoning/data/train/{dataset.lower()}.json",
|
|
f"experiments/reasoning/data/test/{dataset.lower()}.json",
|
|
]
|
|
for path in potential_paths:
|
|
if Path(path).exists():
|
|
dataset_file_path = path
|
|
break
|
|
|
|
if not dataset_file_path:
|
|
# Use a placeholder - docetl's load_custom_evaluate_func requires it
|
|
# but the function might not use it
|
|
dataset_file_path = f"experiments/reasoning/data/train/{dataset.lower()}.json"
|
|
|
|
# Load using docetl's version, which returns (results_file_path) -> dict
|
|
docetl_eval_func = docetl_load_custom_evaluate_func(accuracy_function, dataset_file_path)
|
|
|
|
# Wrap to match experiments' expected signature: (method_name, results_file_path) -> dict
|
|
def wrapped_eval_func(method_name: str, results_file_path: str):
|
|
# Ignore method_name, just call docetl's function
|
|
return docetl_eval_func(results_file_path)
|
|
|
|
custom_evaluate_func = wrapped_eval_func
|
|
print(f"✅ Custom accuracy function loaded. Metric key: {accuracy_metric_key}")
|
|
|
|
# Initialize agent and executor
|
|
agent = SimpleBaselineAgent(model=model, available_models=available_models,
|
|
custom_evaluate_func=custom_evaluate_func, custom_metric_key=accuracy_metric_key)
|
|
agent.start_time = time.time() # Track experiment start time
|
|
agent.load_resources(dataset, yaml_path)
|
|
executor = PipelineExecutor(exp_dir)
|
|
|
|
# Use original query result if available, otherwise run baseline
|
|
all_iteration_results = []
|
|
iteration_counter = 0
|
|
baseline_accuracy = None
|
|
|
|
# Get baseline operations for agent loop
|
|
baseline_ops = agent.original_config.get('operations', []) if agent.original_config else []
|
|
|
|
if original_query_result and original_query_result["success"]:
|
|
print("✅ Using pre-executed original query result")
|
|
# Use the pre-executed original query result
|
|
baseline_cost = original_query_result["cost"]
|
|
|
|
# Copy the original output file to our experiment directory for consistency
|
|
baseline_json_path = exp_dir / "baseline_output.json"
|
|
if original_query_result["output_file_path"]:
|
|
import shutil
|
|
try:
|
|
shutil.copy2(original_query_result["output_file_path"], baseline_json_path)
|
|
|
|
# Run evaluation on the copied baseline
|
|
baseline_eval = agent._run_individual_evaluation(dataset, str(baseline_json_path), 0)
|
|
baseline_accuracy = baseline_eval["accuracy_val"]
|
|
|
|
# Create baseline result for consistency
|
|
baseline_result_with_eval = {
|
|
"success": True,
|
|
"cost": baseline_cost,
|
|
"output_path": str(baseline_json_path),
|
|
"accuracy_msg": baseline_eval["accuracy_msg"],
|
|
"accuracy_val": baseline_eval["accuracy_val"],
|
|
"eval_metrics": baseline_eval["eval_metrics"]
|
|
}
|
|
|
|
# Format and display baseline results
|
|
baseline_formatted = agent._format_test_result(baseline_result_with_eval)
|
|
print(f"🏁 Baseline execution results:")
|
|
print(baseline_formatted)
|
|
|
|
all_iteration_results.append({
|
|
"file_path": str(baseline_json_path),
|
|
"cost": baseline_cost,
|
|
"node_id": str(iteration_counter)
|
|
})
|
|
iteration_counter += 1
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not copy/evaluate original output file: {e}")
|
|
baseline_accuracy = None
|
|
baseline_cost = original_query_result["cost"]
|
|
else:
|
|
baseline_cost = original_query_result["cost"]
|
|
print(f"✅ Baseline cost: ${baseline_cost:.4f}")
|
|
else:
|
|
print("▶️ Running baseline - original query result not available")
|
|
# Run baseline as before
|
|
baseline_result = executor.execute(baseline_ops, dataset, "baseline")
|
|
|
|
if baseline_result["success"] and baseline_result.get("output_path"):
|
|
# Run evaluation on baseline
|
|
baseline_eval = agent._run_individual_evaluation(dataset, baseline_result["output_path"], 0)
|
|
baseline_accuracy = baseline_eval["accuracy_val"]
|
|
|
|
# Update baseline result with evaluation metrics for consistent formatting
|
|
baseline_result_with_eval = {
|
|
**baseline_result,
|
|
"accuracy_msg": baseline_eval["accuracy_msg"],
|
|
"accuracy_val": baseline_eval["accuracy_val"],
|
|
"eval_metrics": baseline_eval["eval_metrics"]
|
|
}
|
|
|
|
# Format and display baseline results
|
|
baseline_formatted = agent._format_test_result(baseline_result_with_eval)
|
|
print(f"🏁 Baseline execution results:")
|
|
print(baseline_formatted)
|
|
|
|
all_iteration_results.append({
|
|
"file_path": baseline_result["output_path"],
|
|
"cost": baseline_result["cost"],
|
|
"node_id": str(iteration_counter)
|
|
})
|
|
iteration_counter += 1
|
|
baseline_cost = baseline_result["cost"]
|
|
else:
|
|
print(f"❌ Baseline execution failed: {baseline_result.get('error', 'Unknown error')}")
|
|
baseline_cost = 0.0
|
|
|
|
# Agent optimization loop
|
|
operators, iteration_counter = agent.run_agent_loop(
|
|
dataset=dataset,
|
|
experiment_dir=exp_dir,
|
|
ground_truth_path=ground_truth_path,
|
|
baseline_cost=baseline_cost,
|
|
baseline_accuracy=baseline_accuracy,
|
|
baseline_operators=baseline_ops,
|
|
all_iteration_results=all_iteration_results,
|
|
iteration_counter=iteration_counter
|
|
)
|
|
|
|
# Execute final pipeline
|
|
print("🚀 Executing final pipeline...")
|
|
final_yaml = executor.create_yaml(operators, dataset, "final_pipeline")
|
|
final_yaml = executor.rewrite_for_modal(final_yaml)
|
|
|
|
try:
|
|
runner = DSLRunner.from_yaml(final_yaml)
|
|
runner.load()
|
|
|
|
if runner.last_op_container:
|
|
data, _, _ = runner.last_op_container.next()
|
|
runner.save(data)
|
|
|
|
with open(final_yaml, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
output_path = config['pipeline']['output']['path']
|
|
|
|
all_iteration_results.append({
|
|
"file_path": output_path,
|
|
"cost": runner.total_cost,
|
|
"node_id": str(iteration_counter)
|
|
})
|
|
|
|
results = {
|
|
"success": True,
|
|
"cost": runner.total_cost,
|
|
"output_file": output_path,
|
|
"pipeline_yaml": final_yaml
|
|
}
|
|
except Exception as e:
|
|
results = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"cost": 0.0,
|
|
"pipeline_yaml": final_yaml
|
|
}
|
|
|
|
# Final evaluation
|
|
if all_iteration_results:
|
|
print(f"📊 Running evaluation on {len(all_iteration_results)} iterations...")
|
|
eval_results, pareto_auc = run_dataset_evaluation(
|
|
dataset=dataset,
|
|
nodes_or_files=all_iteration_results,
|
|
output_path=exp_dir,
|
|
ground_truth_path=ground_truth_path,
|
|
method_name="simple_agent",
|
|
custom_evaluate_func=custom_evaluate_func,
|
|
custom_metric_key=accuracy_metric_key,
|
|
)
|
|
results.update({"evaluation": eval_results, "pareto_auc": pareto_auc})
|
|
|
|
# Track experiment end time and calculate total latency
|
|
agent.end_time = time.time()
|
|
total_latency = agent.end_time - agent.start_time if agent.start_time else 0.0
|
|
|
|
# Save results
|
|
results.update({
|
|
"dataset": dataset,
|
|
"operators": operators,
|
|
"experiment_name": experiment_name,
|
|
"total_search_cost": agent.total_search_cost, # Add LLM call costs
|
|
"total_cost": results.get("cost", 0.0) + agent.total_search_cost, # Pipeline + LLM costs
|
|
"total_latency": total_latency, # Add total latency
|
|
"start_time": agent.start_time,
|
|
"end_time": agent.end_time
|
|
})
|
|
|
|
return results
|
|
|
|
# Modal functions
|
|
@app.function(
|
|
image=image,
|
|
secrets=[modal.Secret.from_dotenv()],
|
|
volumes={VOLUME_MOUNT_PATH: volume},
|
|
timeout=60 * 60 * 2
|
|
)
|
|
def run_simple_agent_remote(dataset: str, output_dir: str = None, model: str = DEFAULT_MODEL,
|
|
experiment_name: str = None, ground_truth_path: str = None,
|
|
original_query_result: Dict[str, Any] | None = None,
|
|
yaml_path: str = None, available_models: List[str] | None = None,
|
|
accuracy_function: str | None = None, accuracy_metric_key: str | None = None):
|
|
"""Modal remote function for running simple agent."""
|
|
os.environ["EXPERIMENT_OUTPUT_DIR"] = str(Path(VOLUME_MOUNT_PATH) / "outputs")
|
|
|
|
resolved_output_dir = PathResolver.resolve_in_volume(output_dir) if output_dir else None
|
|
if resolved_output_dir is None:
|
|
resolved_output_dir = os.environ["EXPERIMENT_OUTPUT_DIR"]
|
|
|
|
results = run_simple_agent_experiment(
|
|
dataset=dataset,
|
|
output_dir=resolved_output_dir,
|
|
model=model,
|
|
experiment_name=experiment_name,
|
|
ground_truth_path=ground_truth_path,
|
|
original_query_result=original_query_result,
|
|
yaml_path=yaml_path,
|
|
available_models=available_models,
|
|
accuracy_function=accuracy_function,
|
|
accuracy_metric_key=accuracy_metric_key
|
|
)
|
|
|
|
volume.commit()
|
|
return results
|
|
|
|
@app.local_entrypoint()
|
|
def modal_main_simple_agent(dataset: str, experiment_name: str | None = None,
|
|
output_dir: str | None = None, model: str = DEFAULT_MODEL,
|
|
ground_truth: str | None = None,
|
|
original_query_result: Dict[str, Any] | None = None,
|
|
available_models: List[str] | None = None,
|
|
accuracy_function: str | None = None, accuracy_metric_key: str | None = None):
|
|
"""Modal entrypoint for simple agent."""
|
|
run_simple_agent_remote.remote(
|
|
dataset=dataset,
|
|
output_dir=output_dir,
|
|
model=model,
|
|
experiment_name=experiment_name,
|
|
ground_truth_path=ground_truth,
|
|
original_query_result=original_query_result,
|
|
available_models=available_models,
|
|
accuracy_function=accuracy_function,
|
|
accuracy_metric_key=accuracy_metric_key
|
|
)
|
|
|
|
def main():
|
|
"""Local main function."""
|
|
parser = argparse.ArgumentParser(description="Run simple agent agent")
|
|
parser.add_argument("--dataset", type=str, required=True,
|
|
help="Dataset name. Must be one of: cuad, game_reviews, blackvault, sustainability, biodex, medec, facility. Or any string if using --accuracy_function.")
|
|
parser.add_argument("--model", type=str, default=DEFAULT_MODEL)
|
|
parser.add_argument("--output_dir", type=str)
|
|
parser.add_argument("--experiment_name", type=str)
|
|
parser.add_argument("--ground_truth", type=str, help="Path to ground truth file")
|
|
parser.add_argument("--available_models", type=str, nargs="+",
|
|
help="List of available models (default: all models). Example: --available_models gpt-5 gpt-5-mini gpt-4o")
|
|
parser.add_argument("--accuracy_function", type=str,
|
|
help="Path to Python file containing custom evaluate_results function for user datasets")
|
|
parser.add_argument("--accuracy_metric_key", type=str,
|
|
help="Key to extract from evaluation results dict for accuracy metric (required with --accuracy_function)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate: if no custom accuracy function, dataset must be a supported one
|
|
if not args.accuracy_function:
|
|
supported_datasets = {"cuad", "game_reviews", "blackvault", "sustainability", "biodex", "medec", "facility"}
|
|
if args.dataset.lower() not in supported_datasets:
|
|
parser.error(f"Dataset '{args.dataset}' is not supported. Use --accuracy_function for custom datasets, or choose from: {', '.join(sorted(supported_datasets))}")
|
|
|
|
results = run_simple_agent_experiment(
|
|
dataset=args.dataset,
|
|
output_dir=args.output_dir,
|
|
model=args.model,
|
|
experiment_name=args.experiment_name,
|
|
ground_truth_path=args.ground_truth,
|
|
available_models=args.available_models,
|
|
accuracy_function=args.accuracy_function,
|
|
accuracy_metric_key=args.accuracy_metric_key
|
|
)
|
|
|
|
if results["success"]:
|
|
print(f"✅ Success! Pipeline cost: ${results['cost']:.4f}")
|
|
print(f"💰 LLM call cost: ${results.get('total_search_cost', 0.0):.4f}")
|
|
print(f"💰 Total cost: ${results.get('total_cost', 0.0):.4f}")
|
|
print(f"⏱️ Total latency: {results.get('total_latency', 0.0):.2f} seconds ({results.get('total_latency', 0.0)/60:.2f} minutes)")
|
|
if "evaluation" in results:
|
|
print(f"Evaluation metrics saved")
|
|
else:
|
|
print(f"❌ Failed: {results.get('error', 'Unknown error')}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |