Compare commits

...

1 Commits
main ... mcp

Author SHA1 Message Date
Shreya Shankar 0b5859fa33 in progress: mcp servers for docetl 2025-11-26 21:13:10 -06:00
9 changed files with 814 additions and 2 deletions

View File

@ -9,6 +9,34 @@ from docetl.runner import DSLRunner
app = typer.Typer(pretty_exceptions_enable=False)
#
# MCP subcommands
#
mcp_app = typer.Typer(
pretty_exceptions_enable=False, help="Model Context Protocol (MCP) server utilities"
)
@mcp_app.command("serve")
def mcp_serve():
"""
Start the DocETL MCP server over stdio.
Requires the optional 'mcp' extra. Install via: pip install "docetl[mcp]"
"""
try:
from docetl_mcp.server import main as _mcp_main # type: ignore
except Exception as e:
typer.echo(
"DocETL MCP server requires the 'mcp' extra and its dependencies.\n"
'Install with:\n pip install "docetl[mcp]"\n\n'
f"Error: {e}"
)
raise typer.Exit(code=1)
_mcp_main()
app.add_typer(mcp_app, name="mcp")
@app.command()
def build(

5
docetl_mcp/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
DocETL MCP server package.
"""

628
docetl_mcp/server.py Normal file
View File

@ -0,0 +1,628 @@
import asyncio
import glob
import json
import os
from dataclasses import asdict
from typing import Any, Dict, Iterable, List, Tuple
import yaml
from docetl.runner import DSLRunner
from docetl.operations import get_operations
from docetl.parsing_tools import get_parsing_tools
# Optional imports guarded at runtime
try:
import fitz # PyMuPDF
except Exception: # pragma: no cover - optional dependency
fitz = None # type: ignore
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent
except Exception as e: # pragma: no cover - server won't start without mcp installed
raise RuntimeError(
"The 'mcp' package is required to run the DocETL MCP server. "
"Install with: pip install mcp"
) from e
server = Server("docetl-mcp")
# --------------- Utility helpers ---------------
DEFAULT_INCLUDE_GLOBS = [
"**/*.txt",
"**/*.md",
"**/*.pdf",
"**/*.docx",
"**/*.pptx",
"**/*.xlsx",
]
SUPPORTED_EXTS = {".txt", ".md", ".pdf", ".docx", ".pptx", ".xlsx"}
def _resolve_abs(path: str) -> str:
return path if os.path.isabs(path) else os.path.abspath(path)
def _collect_files(
directory: str, include: List[str] | None, exclude: List[str] | None
) -> List[str]:
base = _resolve_abs(directory)
patterns = include or DEFAULT_INCLUDE_GLOBS
exclude = exclude or []
files: List[str] = []
for pattern in patterns:
files.extend(glob.glob(os.path.join(base, pattern), recursive=True))
# De-duplicate and filter by supported extensions
files = sorted(
{f for f in files if os.path.isfile(f) and os.path.splitext(f)[1].lower() in SUPPORTED_EXTS}
)
# Apply exclude patterns
if exclude:
excluded: set[str] = set()
for pattern in exclude:
excluded.update(glob.glob(os.path.join(base, pattern), recursive=True))
files = [f for f in files if f not in excluded]
return files
def _read_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
def _read_pdf_text(path: str, doc_per_page: bool) -> List[str]:
if fitz is None:
# No PyMuPDF available; fall back to single record with placeholder
return [f"[PDF parsing unavailable: install docetl[parsing]] Path: {path}"]
texts: List[str] = []
with fitz.open(path) as doc:
if doc_per_page:
for idx, page in enumerate(doc):
texts.append(f"Page {idx+1}:\n{page.get_text() or ''}")
else:
buf = []
for idx, page in enumerate(doc):
buf.append(f"Page {idx+1}:\n{page.get_text() or ''}")
texts.append("\n\n".join(buf))
return texts
def _ensure_dir(path: str) -> None:
out_dir = os.path.dirname(path)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
def _dump_json(path: str, data: Any) -> None:
_ensure_dir(path)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def _try_load_yaml_or_json(
yaml_path: str | None = None, yaml_text: str | None = None, config: dict | None = None
) -> dict:
if config is not None:
return config
if yaml_text is not None:
return yaml.safe_load(yaml_text)
if yaml_path is not None:
with open(_resolve_abs(yaml_path), "r", encoding="utf-8") as f:
return yaml.safe_load(f)
raise ValueError("Provide one of: config (dict), yaml (str), or yaml_path (str)")
def _ok(payload: dict) -> List[TextContent]:
return [TextContent(type="text", text=json.dumps(payload, indent=2))]
def _error(message: str, extra: dict | None = None) -> List[TextContent]:
payload = {"error": message}
if extra:
payload.update(extra)
return [TextContent(type="text", text=json.dumps(payload, indent=2))]
# --------------- Tools ---------------
@server.tool(
"dataset.create_from_directory",
"Create a DocETL dataset JSON from files in a directory.",
{
"type": "object",
"properties": {
"directory": {"type": "string"},
"include": {"type": "array", "items": {"type": "string"}},
"exclude": {"type": "array", "items": {"type": "string"}},
"mode": {"type": "string", "enum": ["eager", "lazy"], "default": "eager"},
"doc_per_page": {"type": "boolean", "default": False},
"pdf_ocr": {"type": "boolean", "default": False},
"output_path": {"type": "string"},
"include_metadata": {"type": "boolean", "default": True},
},
"required": ["directory", "output_path"],
"additionalProperties": False,
},
)
async def dataset_create_from_directory(
directory: str,
output_path: str,
include: List[str] | None = None,
exclude: List[str] | None = None,
mode: str = "eager",
doc_per_page: bool = False,
pdf_ocr: bool = False,
include_metadata: bool = True,
):
directory = _resolve_abs(directory)
output_path = _resolve_abs(output_path)
files = _collect_files(directory, include, exclude)
if not files:
return _error("No files found for given patterns", {"directory": directory})
records: List[dict] = []
suggested_parsing: List[dict] = []
# Suggested parsing only when in lazy mode
if mode == "lazy":
# Map extensions to parser names and kwargs
lazy_map: Dict[str, Tuple[str, Dict[str, Any]]] = {
".txt": ("txt_to_string", {}),
".md": ("txt_to_string", {}),
".docx": ("docx_to_string", {}),
".pptx": ("pptx_to_string", {}),
".xlsx": ("xlsx_to_string", {}),
".pdf": ("paddleocr_pdf_to_string", {"doc_per_page": doc_per_page, "ocr_enabled": pdf_ocr}),
}
for f in files:
_, ext = os.path.splitext(f)
ext = ext.lower()
rec = {"file_path": f, "ext": ext.lstrip(".")}
records.append(rec)
# One consolidated suggestion per extension present
seen_exts = {os.path.splitext(f)[1].lower() for f in files}
for ext in sorted(seen_exts):
if ext in lazy_map:
function, kwargs = lazy_map[ext]
suggested_parsing.append(
{
"function": function,
"function_kwargs": kwargs,
"input_key": "file_path",
"output_key": "src",
}
)
else:
# Eager mode: create records with extracted text for a subset of formats.
for f in files:
_, ext = os.path.splitext(f)
ext = ext.lower()
if ext in {".txt", ".md"}:
texts = [_read_text(f)]
elif ext == ".pdf":
texts = _read_pdf_text(f, doc_per_page=doc_per_page)
else:
# Unsupported eager extraction → fall back to lazy-like record (file_path only)
texts = []
if texts:
for text in texts:
rec = {"src": text}
if include_metadata:
rec.update({"path": f, "ext": ext.lstrip(".")})
records.append(rec)
else:
# If we cannot eagerly parse, at least include as a file reference
records.append({"file_path": f, "ext": ext.lstrip(".")})
_dump_json(output_path, records)
sample_preview = records[: min(3, len(records))]
payload: Dict[str, Any] = {
"dataset_path": output_path,
"num_items": len(records),
"sample": sample_preview,
}
if mode == "lazy":
payload["suggested_parsing"] = suggested_parsing
return _ok(payload)
@server.tool(
"dataset.create_from_directory_azure_di",
"Create a dataset JSON configured for Azure Document Intelligence reading (requires keys).",
{
"type": "object",
"properties": {
"directory": {"type": "string"},
"include": {"type": "array", "items": {"type": "string"}},
"exclude": {"type": "array", "items": {"type": "string"}},
"doc_per_page": {"type": "boolean", "default": False},
"output_path": {"type": "string"},
},
"required": ["directory", "output_path"],
"additionalProperties": False,
},
)
async def dataset_create_from_directory_azure_di(
directory: str,
output_path: str,
include: List[str] | None = None,
exclude: List[str] | None = None,
doc_per_page: bool = False,
):
# Validate keys up-front
if not os.getenv("DOCUMENTINTELLIGENCE_API_KEY") or not os.getenv("DOCUMENTINTELLIGENCE_ENDPOINT"):
return _error(
"Azure Document Intelligence keys not set. Set DOCUMENTINTELLIGENCE_API_KEY and DOCUMENTINTELLIGENCE_ENDPOINT."
)
directory = _resolve_abs(directory)
output_path = _resolve_abs(output_path)
files = _collect_files(directory, include, exclude)
if not files:
return _error("No files found for given patterns", {"directory": directory})
# Always lazy: rely on azure_di_read
records = [{"file_path": f, "ext": os.path.splitext(f)[1].lower().lstrip(".")} for f in files]
_dump_json(output_path, records)
suggested_parsing = [
{
"function": "azure_di_read",
"function_kwargs": {"doc_per_page": doc_per_page},
"input_key": "file_path",
"output_key": "src",
}
]
return _ok(
{
"dataset_path": output_path,
"num_items": len(records),
"sample": records[: min(3, len(records))],
"suggested_parsing": suggested_parsing,
}
)
@server.tool(
"dataset.sample",
"Sample N rows from a dataset JSON created by create_from_directory.",
{
"type": "object",
"properties": {
"dataset_path": {"type": "string"},
"n": {"type": "integer", "minimum": 1, "default": 3},
},
"required": ["dataset_path"],
"additionalProperties": False,
},
)
async def dataset_sample(dataset_path: str, n: int = 3):
dataset_path = _resolve_abs(dataset_path)
if not os.path.exists(dataset_path):
return _error("Dataset path not found", {"dataset_path": dataset_path})
with open(dataset_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
return _error("Dataset JSON is not a list of records", {"dataset_path": dataset_path})
return _ok({"rows": data[: max(0, n)]})
@server.tool(
"ops.list",
"List available DocETL operations.",
{"type": "object", "properties": {}, "additionalProperties": False},
)
async def ops_list():
ops = get_operations()
resp = []
for name, cls in sorted(ops.items(), key=lambda x: x[0]):
doc = (cls.__doc__ or "").strip()
resp.append({"name": name, "doc": doc})
return _ok({"operations": resp})
@server.tool(
"parsing_tools.list",
"List available DocETL parsing tools.",
{"type": "object", "properties": {}, "additionalProperties": False},
)
async def parsing_tools_list():
tools = get_parsing_tools()
return _ok({"parsing_tools": sorted(tools)})
@server.tool(
"pipeline.schema",
"Return the DocETL JSON schema for pipeline configs.",
{"type": "object", "properties": {}, "additionalProperties": False},
)
async def pipeline_schema():
schema = DSLRunner.json_schema
return _ok({"schema": schema})
@server.tool(
"pipeline.validate",
"Validate a pipeline (YAML string, YAML path, or config dict).",
{
"type": "object",
"properties": {
"yaml": {"type": "string"},
"yaml_path": {"type": "string"},
"config": {"type": "object"},
"max_threads": {"type": "integer"},
},
"additionalProperties": False,
},
)
async def pipeline_validate(
yaml: str | None = None, yaml_path: str | None = None, config: dict | None = None, max_threads: int | None = None
):
try:
cfg = _try_load_yaml_or_json(yaml_path=yaml_path, yaml_text=yaml, config=config)
# Build runner for syntax check only
_ = DSLRunner(cfg, max_threads=max_threads)
normalized = yaml and yaml or yaml.safe_dump(cfg)
return _ok({"valid": True, "errors": [], "normalizedYaml": normalized})
except Exception as e:
return _ok({"valid": False, "errors": [str(e)]})
@server.tool(
"pipeline.run",
"Run a pipeline (YAML string, YAML path, or config dict).",
{
"type": "object",
"properties": {
"yaml": {"type": "string"},
"yaml_path": {"type": "string"},
"config": {"type": "object"},
"max_threads": {"type": "integer"},
"optimize": {"type": "boolean", "default": False},
"save_optimized_path": {"type": "string"},
},
"additionalProperties": False,
},
)
async def pipeline_run(
yaml: str | None = None,
yaml_path: str | None = None,
config: dict | None = None,
max_threads: int | None = None,
optimize: bool = False,
save_optimized_path: str | None = None,
):
try:
cfg = _try_load_yaml_or_json(yaml_path=yaml_path, yaml_text=yaml, config=config)
runner = DSLRunner(cfg, max_threads=max_threads)
cost: float
if optimize:
_, cost = runner.optimize(save=bool(save_optimized_path), save_path=save_optimized_path)
# After optimize, re-run to produce outputs with optimized config
# The optimize() returns either a new runner or a dict; but we already saved optimized yaml if requested.
# For simplicity, proceed to run the original runner (user can also run the optimized yaml separately).
cost = runner.load_run_save()
return _ok({"cost": cost, "output_path": runner.get_output_path(require=True)})
except Exception as e:
return _error("Pipeline run failed", {"message": str(e)})
# --------- Examples and Docs (minimal embedded content) ----------
EXAMPLES: Dict[str, str] = {
"summarize-minimal": """\
datasets:
input:
type: file
path: /abs/path/to/dataset.json
default_model: gpt-4o-mini
operations:
- name: summarize
type: map
output:
schema:
summary: str
prompt: |
Summarize the following document briefly:
{{ input.src }}
pipeline:
steps:
- name: step1
input: input
operations:
- summarize
output:
type: file
path: /abs/path/to/output.json
intermediate_dir: intermediate
""",
"split-gather": """\
datasets:
input:
type: file
path: /abs/path/to/dataset.json
default_model: gpt-4o-mini
operations:
- name: split
type: split
chunk_size: 2000
chunk_overlap: 200
text_key: src
output:
schema:
chunk: str
- name: analyze
type: map
output:
schema:
facts: list[str]
prompt: |
Extract salient facts from the chunk:
{{ input.chunk }}
- name: gather
type: gather
gather_key: path
gather_size: 100
output:
schema:
chunks: list[dict]
- name: reduce
type: reduce
reduce_key: [path]
output:
schema:
summary: str
prompt: |
Using all facts from these chunks, write a unified summary:
{% for item in inputs %}- {{ item.facts }}{% endfor %}
pipeline:
steps:
- name: step1
input: input
operations:
- split
- analyze
- gather
- reduce
output:
type: file
path: /abs/path/to/output.json
""",
"resolve-deduplicate": """\
datasets:
input:
type: file
path: /abs/path/to/dataset.json
default_model: gpt-4o-mini
operations:
- name: extract
type: map
output:
schema:
company: str
prompt: |
From the document, extract the company name:
{{ input.src }}
- name: resolve_companies
type: resolve
blocking_keys: ["company"]
blocking_threshold: 0.62
comparison_prompt: |
Are these two names the same company?
1: {{ input1.company }}
2: {{ input2.company }}
output:
schema:
company: str
resolution_prompt: |
Determine a canonical company name for:
{% for entry in inputs %}- {{ entry.company }}{% endfor %}
- name: reduce_by_company
type: reduce
reduce_key: [company]
output:
schema:
summary: str
prompt: |
Summarize information for {{ reduce_key }} using all inputs.
{% for item in inputs %}- {{ item.src }}{% endfor %}
pipeline:
steps:
- name: step1
input: input
operations:
- extract
- resolve_companies
- reduce_by_company
output:
type: file
path: /abs/path/to/output.json
""",
}
DOCS: Dict[str, str] = {
"index": "DocETL MCP: Ingest files into datasets, author arbitrary pipelines, and run them. Use ops.list, parsing_tools.list, pipeline.schema to compose pipelines.",
"pipelines": "Pipelines consist of datasets, operations, steps, and output. See pipeline.schema for the full JSON schema.",
"operators": "Supported operators: map, reduce, resolve, parallel_map, filter, equijoin, split, gather, unnest, cluster, sample, code_map, code_reduce, code_filter, extract, etc.",
}
@server.tool(
"examples.list",
"List available example pipeline templates.",
{"type": "object", "properties": {}, "additionalProperties": False},
)
async def examples_list():
return _ok({"examples": sorted(EXAMPLES.keys())})
@server.tool(
"examples.get",
"Get an example pipeline YAML by name.",
{
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
"additionalProperties": False,
},
)
async def examples_get(name: str):
if name not in EXAMPLES:
return _error("Example not found", {"available": sorted(EXAMPLES.keys())})
return _ok({"name": name, "yaml": EXAMPLES[name]})
@server.tool(
"docs.get",
"Get a brief documentation page by name.",
{
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
"additionalProperties": False,
},
)
async def docs_get(name: str):
if name not in DOCS:
return _error("Doc not found", {"available": sorted(DOCS.keys())})
return _ok({"name": name, "text": DOCS[name]})
async def _amain() -> None:
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream)
def main() -> None:
asyncio.run(_amain())
if __name__ == "__main__":
main()

132
docs/execution/mcp.md Normal file
View File

@ -0,0 +1,132 @@
# MCP Server
DocETL ships an MCP (Model Context Protocol) server so you can attach it to AI clients (Cursor, ChatGPT Desktop, Claude Desktop) and use tools to:
- Create datasets from local directories
- Author and validate arbitrary DocETL pipelines
- Run pipelines and view outputs
- Browse examples and quick docs from within your client
## Installation
Install DocETL with the MCP and parsing extras:
```bash
pip install "docetl[parsing,mcp]"
```
Recommended: Put your LLM API key in an `.env` file at your project root:
```
OPENAI_API_KEY=your_api_key_here
```
## Start the server
Two equivalent options:
```bash
docetl mcp serve
```
This starts a stdio MCP server compatible with tools like Cursor and ChatGPT Desktop.
## Attach in clients
### Cursor example
Add to your Cursor settings:
```json
{
"mcpServers": {
"docetl": {
"command": "docetl",
"args": ["mcp", "serve"],
"env": { "OPENAI_API_KEY": "YOUR_KEY" },
"cwd": "/abs/path/your-project"
}
}
}
```
### Claude Desktop example
Create or edit your Claude Desktop MCP config file:
- macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
- Linux: `~/.config/Claude/claude_desktop_config.json`
- Windows: `%AppData%/Claude/claude_desktop_config.json`
Add an entry like:
```json
{
"mcpServers": {
"docetl": {
"command": "docetl",
"args": ["mcp", "serve"],
"env": { "OPENAI_API_KEY": "YOUR_KEY" },
"cwd": "/abs/path/your-project"
}
}
}
```
Restart Claude Desktop after saving the file.
### What is cwd?
`cwd` is the working directory for the DocETL MCP server process launched by your client.
- Relative paths in your pipeline YAML (e.g., datasets or output files) are resolved against this directory.
- DocETL looks for a `.env` file in `cwd` to load API keys (e.g., `OPENAI_API_KEY`).
- Set `cwd` to the folder where your datasets and pipeline files live.
Example:
- If your files are here:
- `/Users/you/projects/my-docetl-run/dataset.json`
- `/Users/you/projects/my-docetl-run/pipeline.yaml`
- Then set:
- `cwd`: `/Users/you/projects/my-docetl-run`
#### No files yet?
Thats fine. Have your AI agent generate the pipeline YAML inline, then pass it directly to `pipeline.run`. Any relative paths in that YAML (like `datasets.input.path: dataset.json` or `pipeline.output.path: output.json`) will be created under `cwd` at run time. You do not need to create these files in advance.
## What it can do
- Turn a folder of PDFs/TXT/DOCX/PPTX/XLSX into a DocETL dataset
- Draft pipelines from your instructions (any operators: map, reduce, resolve, split/gather, code ops, etc.)
- Validate and run pipelines; write outputs to JSON/CSV
- Show examples and brief docs inside your AI client
- Optional: use Azure Document Intelligence for PDFs (only if youve set keys)
## How to use it (in your AI chat)
You dont need to know any API names. Speak in natural language; the AI will use the MCP server under the hood.
1) Point it at your data
- “I have documents in /abs/path/data (PDFs and TXTs). Prepare whatever you need to use that folder as a DocETL input. Prefer lazy parsing for PDFs. Use /abs/path/output.json for results.”
2) Ask it to draft a pipeline
- “Create a minimal pipeline that summarizes each document into a single summary field. Use sensible defaults and make sure prompts reference the document text. Show me the YAML you plan to run.”
3) Have it run and show results
- “Run the pipeline now, then show me: total cost, output file path, and the first 3 rows.”
4) Iterate
- “Add a resolve stage to deduplicate by company name, then produce one summary per company. Re-run and show the results.”
- “Save the final YAML as pipeline.yaml under the working directory, then run it again.”
## Azure usage (optional)
If you have Azure Document Intelligence credentials:
```bash
export DOCUMENTINTELLIGENCE_API_KEY=...
export DOCUMENTINTELLIGENCE_ENDPOINT=...
```
Use `dataset.create_from_directory_azure_di` to create a dataset configured for `azure_di_read`. By default, the server does not use Azure; you must opt into this tool explicitly.

View File

@ -76,6 +76,7 @@ nav:
- Python API: api-reference/python.md
- Tools & Resources:
- MCP Server: execution/mcp.md
- UI Playground:
- Setup: playground/index.md
- Tutorial: playground/tutorial.md

View File

@ -50,6 +50,9 @@ server = [
"azure-ai-documentintelligence>=1.0.0b4",
"httpx>=0.27.2",
]
mcp = [
"mcp>=1.1.1",
]
[project.scripts]
docetl = "docetl.cli:app"
@ -110,8 +113,8 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build]
packages = ["docetl"]
include = ["docetl/**", "server/**", "README.md", "LICENSE"]
packages = ["docetl", "docetl_mcp"]
include = ["docetl/**", "docetl_mcp/**", "server/**", "README.md", "LICENSE"]
exclude = ["website/**/*"]
[tool.pytest.ini_options]

5
tests/mcp/data/doc1.txt Normal file
View File

@ -0,0 +1,5 @@
DocETL is an ETL framework for LLM-powered document processing.
This sample document contains a short paragraph describing the project.
The goal is to enable easy ingestion, transformation, and summarization.

5
tests/mcp/data/doc2.txt Normal file
View File

@ -0,0 +1,5 @@
Large Language Models can extract structure from unstructured text.
With DocETL, you can map, reduce, resolve, and split documents efficiently.
This file exists to help you quickly test the MCP server end-to-end.

5
tests/mcp/data/doc3.txt Normal file
View File

@ -0,0 +1,5 @@
This is a tiny third document for testing.
Feel free to modify, add, or remove files in this folder as needed.
The MCP server should discover these files and build a dataset.