Compare commits
1 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
0b5859fa33 |
|
|
@ -9,6 +9,34 @@ from docetl.runner import DSLRunner
|
|||
|
||||
app = typer.Typer(pretty_exceptions_enable=False)
|
||||
|
||||
#
|
||||
# MCP subcommands
|
||||
#
|
||||
mcp_app = typer.Typer(
|
||||
pretty_exceptions_enable=False, help="Model Context Protocol (MCP) server utilities"
|
||||
)
|
||||
|
||||
|
||||
@mcp_app.command("serve")
|
||||
def mcp_serve():
|
||||
"""
|
||||
Start the DocETL MCP server over stdio.
|
||||
Requires the optional 'mcp' extra. Install via: pip install "docetl[mcp]"
|
||||
"""
|
||||
try:
|
||||
from docetl_mcp.server import main as _mcp_main # type: ignore
|
||||
except Exception as e:
|
||||
typer.echo(
|
||||
"DocETL MCP server requires the 'mcp' extra and its dependencies.\n"
|
||||
'Install with:\n pip install "docetl[mcp]"\n\n'
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise typer.Exit(code=1)
|
||||
_mcp_main()
|
||||
|
||||
|
||||
app.add_typer(mcp_app, name="mcp")
|
||||
|
||||
|
||||
@app.command()
|
||||
def build(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
"""
|
||||
DocETL MCP server package.
|
||||
"""
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,628 @@
|
|||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
from dataclasses import asdict
|
||||
from typing import Any, Dict, Iterable, List, Tuple
|
||||
|
||||
import yaml
|
||||
|
||||
from docetl.runner import DSLRunner
|
||||
from docetl.operations import get_operations
|
||||
from docetl.parsing_tools import get_parsing_tools
|
||||
|
||||
# Optional imports guarded at runtime
|
||||
try:
|
||||
import fitz # PyMuPDF
|
||||
except Exception: # pragma: no cover - optional dependency
|
||||
fitz = None # type: ignore
|
||||
|
||||
try:
|
||||
from mcp.server import Server
|
||||
from mcp.server.stdio import stdio_server
|
||||
from mcp.types import TextContent
|
||||
except Exception as e: # pragma: no cover - server won't start without mcp installed
|
||||
raise RuntimeError(
|
||||
"The 'mcp' package is required to run the DocETL MCP server. "
|
||||
"Install with: pip install mcp"
|
||||
) from e
|
||||
|
||||
|
||||
server = Server("docetl-mcp")
|
||||
|
||||
|
||||
# --------------- Utility helpers ---------------
|
||||
|
||||
DEFAULT_INCLUDE_GLOBS = [
|
||||
"**/*.txt",
|
||||
"**/*.md",
|
||||
"**/*.pdf",
|
||||
"**/*.docx",
|
||||
"**/*.pptx",
|
||||
"**/*.xlsx",
|
||||
]
|
||||
|
||||
SUPPORTED_EXTS = {".txt", ".md", ".pdf", ".docx", ".pptx", ".xlsx"}
|
||||
|
||||
|
||||
def _resolve_abs(path: str) -> str:
|
||||
return path if os.path.isabs(path) else os.path.abspath(path)
|
||||
|
||||
|
||||
def _collect_files(
|
||||
directory: str, include: List[str] | None, exclude: List[str] | None
|
||||
) -> List[str]:
|
||||
base = _resolve_abs(directory)
|
||||
patterns = include or DEFAULT_INCLUDE_GLOBS
|
||||
exclude = exclude or []
|
||||
|
||||
files: List[str] = []
|
||||
for pattern in patterns:
|
||||
files.extend(glob.glob(os.path.join(base, pattern), recursive=True))
|
||||
|
||||
# De-duplicate and filter by supported extensions
|
||||
files = sorted(
|
||||
{f for f in files if os.path.isfile(f) and os.path.splitext(f)[1].lower() in SUPPORTED_EXTS}
|
||||
)
|
||||
|
||||
# Apply exclude patterns
|
||||
if exclude:
|
||||
excluded: set[str] = set()
|
||||
for pattern in exclude:
|
||||
excluded.update(glob.glob(os.path.join(base, pattern), recursive=True))
|
||||
files = [f for f in files if f not in excluded]
|
||||
|
||||
return files
|
||||
|
||||
|
||||
def _read_text(path: str) -> str:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def _read_pdf_text(path: str, doc_per_page: bool) -> List[str]:
|
||||
if fitz is None:
|
||||
# No PyMuPDF available; fall back to single record with placeholder
|
||||
return [f"[PDF parsing unavailable: install docetl[parsing]] Path: {path}"]
|
||||
texts: List[str] = []
|
||||
with fitz.open(path) as doc:
|
||||
if doc_per_page:
|
||||
for idx, page in enumerate(doc):
|
||||
texts.append(f"Page {idx+1}:\n{page.get_text() or ''}")
|
||||
else:
|
||||
buf = []
|
||||
for idx, page in enumerate(doc):
|
||||
buf.append(f"Page {idx+1}:\n{page.get_text() or ''}")
|
||||
texts.append("\n\n".join(buf))
|
||||
return texts
|
||||
|
||||
|
||||
def _ensure_dir(path: str) -> None:
|
||||
out_dir = os.path.dirname(path)
|
||||
if out_dir:
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
|
||||
def _dump_json(path: str, data: Any) -> None:
|
||||
_ensure_dir(path)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
def _try_load_yaml_or_json(
|
||||
yaml_path: str | None = None, yaml_text: str | None = None, config: dict | None = None
|
||||
) -> dict:
|
||||
if config is not None:
|
||||
return config
|
||||
if yaml_text is not None:
|
||||
return yaml.safe_load(yaml_text)
|
||||
if yaml_path is not None:
|
||||
with open(_resolve_abs(yaml_path), "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)
|
||||
raise ValueError("Provide one of: config (dict), yaml (str), or yaml_path (str)")
|
||||
|
||||
|
||||
def _ok(payload: dict) -> List[TextContent]:
|
||||
return [TextContent(type="text", text=json.dumps(payload, indent=2))]
|
||||
|
||||
|
||||
def _error(message: str, extra: dict | None = None) -> List[TextContent]:
|
||||
payload = {"error": message}
|
||||
if extra:
|
||||
payload.update(extra)
|
||||
return [TextContent(type="text", text=json.dumps(payload, indent=2))]
|
||||
|
||||
|
||||
# --------------- Tools ---------------
|
||||
|
||||
|
||||
@server.tool(
|
||||
"dataset.create_from_directory",
|
||||
"Create a DocETL dataset JSON from files in a directory.",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"directory": {"type": "string"},
|
||||
"include": {"type": "array", "items": {"type": "string"}},
|
||||
"exclude": {"type": "array", "items": {"type": "string"}},
|
||||
"mode": {"type": "string", "enum": ["eager", "lazy"], "default": "eager"},
|
||||
"doc_per_page": {"type": "boolean", "default": False},
|
||||
"pdf_ocr": {"type": "boolean", "default": False},
|
||||
"output_path": {"type": "string"},
|
||||
"include_metadata": {"type": "boolean", "default": True},
|
||||
},
|
||||
"required": ["directory", "output_path"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def dataset_create_from_directory(
|
||||
directory: str,
|
||||
output_path: str,
|
||||
include: List[str] | None = None,
|
||||
exclude: List[str] | None = None,
|
||||
mode: str = "eager",
|
||||
doc_per_page: bool = False,
|
||||
pdf_ocr: bool = False,
|
||||
include_metadata: bool = True,
|
||||
):
|
||||
directory = _resolve_abs(directory)
|
||||
output_path = _resolve_abs(output_path)
|
||||
files = _collect_files(directory, include, exclude)
|
||||
|
||||
if not files:
|
||||
return _error("No files found for given patterns", {"directory": directory})
|
||||
|
||||
records: List[dict] = []
|
||||
suggested_parsing: List[dict] = []
|
||||
|
||||
# Suggested parsing only when in lazy mode
|
||||
if mode == "lazy":
|
||||
# Map extensions to parser names and kwargs
|
||||
lazy_map: Dict[str, Tuple[str, Dict[str, Any]]] = {
|
||||
".txt": ("txt_to_string", {}),
|
||||
".md": ("txt_to_string", {}),
|
||||
".docx": ("docx_to_string", {}),
|
||||
".pptx": ("pptx_to_string", {}),
|
||||
".xlsx": ("xlsx_to_string", {}),
|
||||
".pdf": ("paddleocr_pdf_to_string", {"doc_per_page": doc_per_page, "ocr_enabled": pdf_ocr}),
|
||||
}
|
||||
|
||||
for f in files:
|
||||
_, ext = os.path.splitext(f)
|
||||
ext = ext.lower()
|
||||
rec = {"file_path": f, "ext": ext.lstrip(".")}
|
||||
records.append(rec)
|
||||
# One consolidated suggestion per extension present
|
||||
seen_exts = {os.path.splitext(f)[1].lower() for f in files}
|
||||
for ext in sorted(seen_exts):
|
||||
if ext in lazy_map:
|
||||
function, kwargs = lazy_map[ext]
|
||||
suggested_parsing.append(
|
||||
{
|
||||
"function": function,
|
||||
"function_kwargs": kwargs,
|
||||
"input_key": "file_path",
|
||||
"output_key": "src",
|
||||
}
|
||||
)
|
||||
else:
|
||||
# Eager mode: create records with extracted text for a subset of formats.
|
||||
for f in files:
|
||||
_, ext = os.path.splitext(f)
|
||||
ext = ext.lower()
|
||||
if ext in {".txt", ".md"}:
|
||||
texts = [_read_text(f)]
|
||||
elif ext == ".pdf":
|
||||
texts = _read_pdf_text(f, doc_per_page=doc_per_page)
|
||||
else:
|
||||
# Unsupported eager extraction → fall back to lazy-like record (file_path only)
|
||||
texts = []
|
||||
|
||||
if texts:
|
||||
for text in texts:
|
||||
rec = {"src": text}
|
||||
if include_metadata:
|
||||
rec.update({"path": f, "ext": ext.lstrip(".")})
|
||||
records.append(rec)
|
||||
else:
|
||||
# If we cannot eagerly parse, at least include as a file reference
|
||||
records.append({"file_path": f, "ext": ext.lstrip(".")})
|
||||
|
||||
_dump_json(output_path, records)
|
||||
|
||||
sample_preview = records[: min(3, len(records))]
|
||||
payload: Dict[str, Any] = {
|
||||
"dataset_path": output_path,
|
||||
"num_items": len(records),
|
||||
"sample": sample_preview,
|
||||
}
|
||||
if mode == "lazy":
|
||||
payload["suggested_parsing"] = suggested_parsing
|
||||
return _ok(payload)
|
||||
|
||||
|
||||
@server.tool(
|
||||
"dataset.create_from_directory_azure_di",
|
||||
"Create a dataset JSON configured for Azure Document Intelligence reading (requires keys).",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"directory": {"type": "string"},
|
||||
"include": {"type": "array", "items": {"type": "string"}},
|
||||
"exclude": {"type": "array", "items": {"type": "string"}},
|
||||
"doc_per_page": {"type": "boolean", "default": False},
|
||||
"output_path": {"type": "string"},
|
||||
},
|
||||
"required": ["directory", "output_path"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def dataset_create_from_directory_azure_di(
|
||||
directory: str,
|
||||
output_path: str,
|
||||
include: List[str] | None = None,
|
||||
exclude: List[str] | None = None,
|
||||
doc_per_page: bool = False,
|
||||
):
|
||||
# Validate keys up-front
|
||||
if not os.getenv("DOCUMENTINTELLIGENCE_API_KEY") or not os.getenv("DOCUMENTINTELLIGENCE_ENDPOINT"):
|
||||
return _error(
|
||||
"Azure Document Intelligence keys not set. Set DOCUMENTINTELLIGENCE_API_KEY and DOCUMENTINTELLIGENCE_ENDPOINT."
|
||||
)
|
||||
|
||||
directory = _resolve_abs(directory)
|
||||
output_path = _resolve_abs(output_path)
|
||||
files = _collect_files(directory, include, exclude)
|
||||
|
||||
if not files:
|
||||
return _error("No files found for given patterns", {"directory": directory})
|
||||
|
||||
# Always lazy: rely on azure_di_read
|
||||
records = [{"file_path": f, "ext": os.path.splitext(f)[1].lower().lstrip(".")} for f in files]
|
||||
_dump_json(output_path, records)
|
||||
|
||||
suggested_parsing = [
|
||||
{
|
||||
"function": "azure_di_read",
|
||||
"function_kwargs": {"doc_per_page": doc_per_page},
|
||||
"input_key": "file_path",
|
||||
"output_key": "src",
|
||||
}
|
||||
]
|
||||
return _ok(
|
||||
{
|
||||
"dataset_path": output_path,
|
||||
"num_items": len(records),
|
||||
"sample": records[: min(3, len(records))],
|
||||
"suggested_parsing": suggested_parsing,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@server.tool(
|
||||
"dataset.sample",
|
||||
"Sample N rows from a dataset JSON created by create_from_directory.",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"dataset_path": {"type": "string"},
|
||||
"n": {"type": "integer", "minimum": 1, "default": 3},
|
||||
},
|
||||
"required": ["dataset_path"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def dataset_sample(dataset_path: str, n: int = 3):
|
||||
dataset_path = _resolve_abs(dataset_path)
|
||||
if not os.path.exists(dataset_path):
|
||||
return _error("Dataset path not found", {"dataset_path": dataset_path})
|
||||
with open(dataset_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
return _error("Dataset JSON is not a list of records", {"dataset_path": dataset_path})
|
||||
return _ok({"rows": data[: max(0, n)]})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"ops.list",
|
||||
"List available DocETL operations.",
|
||||
{"type": "object", "properties": {}, "additionalProperties": False},
|
||||
)
|
||||
async def ops_list():
|
||||
ops = get_operations()
|
||||
resp = []
|
||||
for name, cls in sorted(ops.items(), key=lambda x: x[0]):
|
||||
doc = (cls.__doc__ or "").strip()
|
||||
resp.append({"name": name, "doc": doc})
|
||||
return _ok({"operations": resp})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"parsing_tools.list",
|
||||
"List available DocETL parsing tools.",
|
||||
{"type": "object", "properties": {}, "additionalProperties": False},
|
||||
)
|
||||
async def parsing_tools_list():
|
||||
tools = get_parsing_tools()
|
||||
return _ok({"parsing_tools": sorted(tools)})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"pipeline.schema",
|
||||
"Return the DocETL JSON schema for pipeline configs.",
|
||||
{"type": "object", "properties": {}, "additionalProperties": False},
|
||||
)
|
||||
async def pipeline_schema():
|
||||
schema = DSLRunner.json_schema
|
||||
return _ok({"schema": schema})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"pipeline.validate",
|
||||
"Validate a pipeline (YAML string, YAML path, or config dict).",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"yaml": {"type": "string"},
|
||||
"yaml_path": {"type": "string"},
|
||||
"config": {"type": "object"},
|
||||
"max_threads": {"type": "integer"},
|
||||
},
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def pipeline_validate(
|
||||
yaml: str | None = None, yaml_path: str | None = None, config: dict | None = None, max_threads: int | None = None
|
||||
):
|
||||
try:
|
||||
cfg = _try_load_yaml_or_json(yaml_path=yaml_path, yaml_text=yaml, config=config)
|
||||
# Build runner for syntax check only
|
||||
_ = DSLRunner(cfg, max_threads=max_threads)
|
||||
normalized = yaml and yaml or yaml.safe_dump(cfg)
|
||||
return _ok({"valid": True, "errors": [], "normalizedYaml": normalized})
|
||||
except Exception as e:
|
||||
return _ok({"valid": False, "errors": [str(e)]})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"pipeline.run",
|
||||
"Run a pipeline (YAML string, YAML path, or config dict).",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"yaml": {"type": "string"},
|
||||
"yaml_path": {"type": "string"},
|
||||
"config": {"type": "object"},
|
||||
"max_threads": {"type": "integer"},
|
||||
"optimize": {"type": "boolean", "default": False},
|
||||
"save_optimized_path": {"type": "string"},
|
||||
},
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def pipeline_run(
|
||||
yaml: str | None = None,
|
||||
yaml_path: str | None = None,
|
||||
config: dict | None = None,
|
||||
max_threads: int | None = None,
|
||||
optimize: bool = False,
|
||||
save_optimized_path: str | None = None,
|
||||
):
|
||||
try:
|
||||
cfg = _try_load_yaml_or_json(yaml_path=yaml_path, yaml_text=yaml, config=config)
|
||||
runner = DSLRunner(cfg, max_threads=max_threads)
|
||||
cost: float
|
||||
if optimize:
|
||||
_, cost = runner.optimize(save=bool(save_optimized_path), save_path=save_optimized_path)
|
||||
# After optimize, re-run to produce outputs with optimized config
|
||||
# The optimize() returns either a new runner or a dict; but we already saved optimized yaml if requested.
|
||||
# For simplicity, proceed to run the original runner (user can also run the optimized yaml separately).
|
||||
cost = runner.load_run_save()
|
||||
return _ok({"cost": cost, "output_path": runner.get_output_path(require=True)})
|
||||
except Exception as e:
|
||||
return _error("Pipeline run failed", {"message": str(e)})
|
||||
|
||||
|
||||
# --------- Examples and Docs (minimal embedded content) ----------
|
||||
|
||||
EXAMPLES: Dict[str, str] = {
|
||||
"summarize-minimal": """\
|
||||
datasets:
|
||||
input:
|
||||
type: file
|
||||
path: /abs/path/to/dataset.json
|
||||
|
||||
default_model: gpt-4o-mini
|
||||
|
||||
operations:
|
||||
- name: summarize
|
||||
type: map
|
||||
output:
|
||||
schema:
|
||||
summary: str
|
||||
prompt: |
|
||||
Summarize the following document briefly:
|
||||
{{ input.src }}
|
||||
|
||||
pipeline:
|
||||
steps:
|
||||
- name: step1
|
||||
input: input
|
||||
operations:
|
||||
- summarize
|
||||
output:
|
||||
type: file
|
||||
path: /abs/path/to/output.json
|
||||
intermediate_dir: intermediate
|
||||
""",
|
||||
"split-gather": """\
|
||||
datasets:
|
||||
input:
|
||||
type: file
|
||||
path: /abs/path/to/dataset.json
|
||||
|
||||
default_model: gpt-4o-mini
|
||||
|
||||
operations:
|
||||
- name: split
|
||||
type: split
|
||||
chunk_size: 2000
|
||||
chunk_overlap: 200
|
||||
text_key: src
|
||||
output:
|
||||
schema:
|
||||
chunk: str
|
||||
- name: analyze
|
||||
type: map
|
||||
output:
|
||||
schema:
|
||||
facts: list[str]
|
||||
prompt: |
|
||||
Extract salient facts from the chunk:
|
||||
{{ input.chunk }}
|
||||
- name: gather
|
||||
type: gather
|
||||
gather_key: path
|
||||
gather_size: 100
|
||||
output:
|
||||
schema:
|
||||
chunks: list[dict]
|
||||
- name: reduce
|
||||
type: reduce
|
||||
reduce_key: [path]
|
||||
output:
|
||||
schema:
|
||||
summary: str
|
||||
prompt: |
|
||||
Using all facts from these chunks, write a unified summary:
|
||||
{% for item in inputs %}- {{ item.facts }}{% endfor %}
|
||||
|
||||
pipeline:
|
||||
steps:
|
||||
- name: step1
|
||||
input: input
|
||||
operations:
|
||||
- split
|
||||
- analyze
|
||||
- gather
|
||||
- reduce
|
||||
output:
|
||||
type: file
|
||||
path: /abs/path/to/output.json
|
||||
""",
|
||||
"resolve-deduplicate": """\
|
||||
datasets:
|
||||
input:
|
||||
type: file
|
||||
path: /abs/path/to/dataset.json
|
||||
|
||||
default_model: gpt-4o-mini
|
||||
|
||||
operations:
|
||||
- name: extract
|
||||
type: map
|
||||
output:
|
||||
schema:
|
||||
company: str
|
||||
prompt: |
|
||||
From the document, extract the company name:
|
||||
{{ input.src }}
|
||||
- name: resolve_companies
|
||||
type: resolve
|
||||
blocking_keys: ["company"]
|
||||
blocking_threshold: 0.62
|
||||
comparison_prompt: |
|
||||
Are these two names the same company?
|
||||
1: {{ input1.company }}
|
||||
2: {{ input2.company }}
|
||||
output:
|
||||
schema:
|
||||
company: str
|
||||
resolution_prompt: |
|
||||
Determine a canonical company name for:
|
||||
{% for entry in inputs %}- {{ entry.company }}{% endfor %}
|
||||
- name: reduce_by_company
|
||||
type: reduce
|
||||
reduce_key: [company]
|
||||
output:
|
||||
schema:
|
||||
summary: str
|
||||
prompt: |
|
||||
Summarize information for {{ reduce_key }} using all inputs.
|
||||
{% for item in inputs %}- {{ item.src }}{% endfor %}
|
||||
|
||||
pipeline:
|
||||
steps:
|
||||
- name: step1
|
||||
input: input
|
||||
operations:
|
||||
- extract
|
||||
- resolve_companies
|
||||
- reduce_by_company
|
||||
output:
|
||||
type: file
|
||||
path: /abs/path/to/output.json
|
||||
""",
|
||||
}
|
||||
|
||||
DOCS: Dict[str, str] = {
|
||||
"index": "DocETL MCP: Ingest files into datasets, author arbitrary pipelines, and run them. Use ops.list, parsing_tools.list, pipeline.schema to compose pipelines.",
|
||||
"pipelines": "Pipelines consist of datasets, operations, steps, and output. See pipeline.schema for the full JSON schema.",
|
||||
"operators": "Supported operators: map, reduce, resolve, parallel_map, filter, equijoin, split, gather, unnest, cluster, sample, code_map, code_reduce, code_filter, extract, etc.",
|
||||
}
|
||||
|
||||
|
||||
@server.tool(
|
||||
"examples.list",
|
||||
"List available example pipeline templates.",
|
||||
{"type": "object", "properties": {}, "additionalProperties": False},
|
||||
)
|
||||
async def examples_list():
|
||||
return _ok({"examples": sorted(EXAMPLES.keys())})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"examples.get",
|
||||
"Get an example pipeline YAML by name.",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {"name": {"type": "string"}},
|
||||
"required": ["name"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def examples_get(name: str):
|
||||
if name not in EXAMPLES:
|
||||
return _error("Example not found", {"available": sorted(EXAMPLES.keys())})
|
||||
return _ok({"name": name, "yaml": EXAMPLES[name]})
|
||||
|
||||
|
||||
@server.tool(
|
||||
"docs.get",
|
||||
"Get a brief documentation page by name.",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {"name": {"type": "string"}},
|
||||
"required": ["name"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
)
|
||||
async def docs_get(name: str):
|
||||
if name not in DOCS:
|
||||
return _error("Doc not found", {"available": sorted(DOCS.keys())})
|
||||
return _ok({"name": name, "text": DOCS[name]})
|
||||
|
||||
|
||||
async def _amain() -> None:
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await server.run(read_stream, write_stream)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
asyncio.run(_amain())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,132 @@
|
|||
# MCP Server
|
||||
|
||||
DocETL ships an MCP (Model Context Protocol) server so you can attach it to AI clients (Cursor, ChatGPT Desktop, Claude Desktop) and use tools to:
|
||||
|
||||
- Create datasets from local directories
|
||||
- Author and validate arbitrary DocETL pipelines
|
||||
- Run pipelines and view outputs
|
||||
- Browse examples and quick docs from within your client
|
||||
|
||||
## Installation
|
||||
|
||||
Install DocETL with the MCP and parsing extras:
|
||||
|
||||
```bash
|
||||
pip install "docetl[parsing,mcp]"
|
||||
```
|
||||
|
||||
Recommended: Put your LLM API key in an `.env` file at your project root:
|
||||
|
||||
```
|
||||
OPENAI_API_KEY=your_api_key_here
|
||||
```
|
||||
|
||||
## Start the server
|
||||
|
||||
Two equivalent options:
|
||||
|
||||
```bash
|
||||
docetl mcp serve
|
||||
```
|
||||
|
||||
This starts a stdio MCP server compatible with tools like Cursor and ChatGPT Desktop.
|
||||
|
||||
## Attach in clients
|
||||
|
||||
### Cursor example
|
||||
|
||||
Add to your Cursor settings:
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"docetl": {
|
||||
"command": "docetl",
|
||||
"args": ["mcp", "serve"],
|
||||
"env": { "OPENAI_API_KEY": "YOUR_KEY" },
|
||||
"cwd": "/abs/path/your-project"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Claude Desktop example
|
||||
|
||||
Create or edit your Claude Desktop MCP config file:
|
||||
|
||||
- macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
|
||||
- Linux: `~/.config/Claude/claude_desktop_config.json`
|
||||
- Windows: `%AppData%/Claude/claude_desktop_config.json`
|
||||
|
||||
Add an entry like:
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"docetl": {
|
||||
"command": "docetl",
|
||||
"args": ["mcp", "serve"],
|
||||
"env": { "OPENAI_API_KEY": "YOUR_KEY" },
|
||||
"cwd": "/abs/path/your-project"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Restart Claude Desktop after saving the file.
|
||||
|
||||
### What is cwd?
|
||||
|
||||
`cwd` is the working directory for the DocETL MCP server process launched by your client.
|
||||
- Relative paths in your pipeline YAML (e.g., datasets or output files) are resolved against this directory.
|
||||
- DocETL looks for a `.env` file in `cwd` to load API keys (e.g., `OPENAI_API_KEY`).
|
||||
- Set `cwd` to the folder where your datasets and pipeline files live.
|
||||
|
||||
Example:
|
||||
- If your files are here:
|
||||
- `/Users/you/projects/my-docetl-run/dataset.json`
|
||||
- `/Users/you/projects/my-docetl-run/pipeline.yaml`
|
||||
- Then set:
|
||||
- `cwd`: `/Users/you/projects/my-docetl-run`
|
||||
|
||||
#### No files yet?
|
||||
|
||||
That’s fine. Have your AI agent generate the pipeline YAML inline, then pass it directly to `pipeline.run`. Any relative paths in that YAML (like `datasets.input.path: dataset.json` or `pipeline.output.path: output.json`) will be created under `cwd` at run time. You do not need to create these files in advance.
|
||||
|
||||
## What it can do
|
||||
|
||||
- Turn a folder of PDFs/TXT/DOCX/PPTX/XLSX into a DocETL dataset
|
||||
- Draft pipelines from your instructions (any operators: map, reduce, resolve, split/gather, code ops, etc.)
|
||||
- Validate and run pipelines; write outputs to JSON/CSV
|
||||
- Show examples and brief docs inside your AI client
|
||||
- Optional: use Azure Document Intelligence for PDFs (only if you’ve set keys)
|
||||
|
||||
## How to use it (in your AI chat)
|
||||
|
||||
You don’t need to know any API names. Speak in natural language; the AI will use the MCP server under the hood.
|
||||
|
||||
1) Point it at your data
|
||||
- “I have documents in /abs/path/data (PDFs and TXTs). Prepare whatever you need to use that folder as a DocETL input. Prefer lazy parsing for PDFs. Use /abs/path/output.json for results.”
|
||||
|
||||
2) Ask it to draft a pipeline
|
||||
- “Create a minimal pipeline that summarizes each document into a single summary field. Use sensible defaults and make sure prompts reference the document text. Show me the YAML you plan to run.”
|
||||
|
||||
3) Have it run and show results
|
||||
- “Run the pipeline now, then show me: total cost, output file path, and the first 3 rows.”
|
||||
|
||||
4) Iterate
|
||||
- “Add a resolve stage to deduplicate by company name, then produce one summary per company. Re-run and show the results.”
|
||||
- “Save the final YAML as pipeline.yaml under the working directory, then run it again.”
|
||||
|
||||
## Azure usage (optional)
|
||||
|
||||
If you have Azure Document Intelligence credentials:
|
||||
|
||||
```bash
|
||||
export DOCUMENTINTELLIGENCE_API_KEY=...
|
||||
export DOCUMENTINTELLIGENCE_ENDPOINT=...
|
||||
```
|
||||
|
||||
Use `dataset.create_from_directory_azure_di` to create a dataset configured for `azure_di_read`. By default, the server does not use Azure; you must opt into this tool explicitly.
|
||||
|
||||
|
||||
|
|
@ -76,6 +76,7 @@ nav:
|
|||
- Python API: api-reference/python.md
|
||||
|
||||
- Tools & Resources:
|
||||
- MCP Server: execution/mcp.md
|
||||
- UI Playground:
|
||||
- Setup: playground/index.md
|
||||
- Tutorial: playground/tutorial.md
|
||||
|
|
|
|||
|
|
@ -50,6 +50,9 @@ server = [
|
|||
"azure-ai-documentintelligence>=1.0.0b4",
|
||||
"httpx>=0.27.2",
|
||||
]
|
||||
mcp = [
|
||||
"mcp>=1.1.1",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
docetl = "docetl.cli:app"
|
||||
|
|
@ -110,8 +113,8 @@ requires = ["hatchling"]
|
|||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build]
|
||||
packages = ["docetl"]
|
||||
include = ["docetl/**", "server/**", "README.md", "LICENSE"]
|
||||
packages = ["docetl", "docetl_mcp"]
|
||||
include = ["docetl/**", "docetl_mcp/**", "server/**", "README.md", "LICENSE"]
|
||||
exclude = ["website/**/*"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
DocETL is an ETL framework for LLM-powered document processing.
|
||||
This sample document contains a short paragraph describing the project.
|
||||
The goal is to enable easy ingestion, transformation, and summarization.
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
Large Language Models can extract structure from unstructured text.
|
||||
With DocETL, you can map, reduce, resolve, and split documents efficiently.
|
||||
This file exists to help you quickly test the MCP server end-to-end.
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
This is a tiny third document for testing.
|
||||
Feel free to modify, add, or remove files in this folder as needed.
|
||||
The MCP server should discover these files and build a dataset.
|
||||
|
||||
|
||||
Loading…
Reference in New Issue