595 lines
17 KiB
Python
595 lines
17 KiB
Python
import pytest
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import pandas as pd
|
|
from docetl.api import (
|
|
Pipeline,
|
|
Dataset,
|
|
CodeMapOp,
|
|
CodeReduceOp,
|
|
CodeFilterOp,
|
|
ExtractOp,
|
|
MapOp,
|
|
ReduceOp,
|
|
ParallelMapOp,
|
|
FilterOp,
|
|
PipelineStep,
|
|
PipelineOutput,
|
|
ResolveOp,
|
|
EquijoinOp,
|
|
)
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
|
|
@pytest.fixture
|
|
def default_model():
|
|
return "gpt-4o-mini"
|
|
|
|
|
|
@pytest.fixture
|
|
def max_threads():
|
|
return 4
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_input_file():
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
|
|
json.dump(
|
|
[
|
|
{"text": "This is a positive sentence.", "group": "A"},
|
|
{"text": "This is a negative sentence.", "group": "B"},
|
|
{"text": "This is a neutral sentence.", "group": "A"},
|
|
],
|
|
tmp,
|
|
)
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture(params=["file", "memory"])
|
|
def temp_input_dataset(request, temp_input_file):
|
|
if request.param == "file":
|
|
return Dataset(type="file", path=temp_input_file)
|
|
else:
|
|
# this will be a DataFrame already
|
|
df = pd.read_json(temp_input_file)
|
|
return Dataset(type="memory", path=df)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_output_file():
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp:
|
|
pass
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_intermediate_dir():
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
yield tmpdirname
|
|
|
|
|
|
@pytest.fixture
|
|
def map_config():
|
|
return MapOp(
|
|
name="sentiment_analysis",
|
|
type="map",
|
|
prompt="Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.",
|
|
output={"schema": {"sentiment": "string"}},
|
|
model="gpt-4o-mini",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def reduce_config():
|
|
return ReduceOp(
|
|
name="group_summary",
|
|
type="reduce",
|
|
reduce_key="group",
|
|
prompt="Summarize the following group of values: {{ inputs }} Provide a total and any other relevant statistics.",
|
|
output={"schema": {"total": "number", "avg": "number"}},
|
|
model="gpt-4o-mini",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def parallel_map_config():
|
|
return ParallelMapOp(
|
|
name="sentiment_and_word_count",
|
|
type="parallel_map",
|
|
prompts=[
|
|
{
|
|
"name": "sentiment",
|
|
"prompt": "Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.",
|
|
"output_keys": ["sentiment"],
|
|
"model": "gpt-4o-mini",
|
|
},
|
|
{
|
|
"name": "word_count",
|
|
"prompt": "Count the number of words in the following text: '{{ input.text }}'. Return the count as an integer.",
|
|
"output_keys": ["word_count"],
|
|
"model": "gpt-4o-mini",
|
|
},
|
|
],
|
|
output={"schema": {"sentiment": "string", "word_count": "integer"}},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def filter_config():
|
|
return FilterOp(
|
|
name="positive_sentiment_filter",
|
|
type="filter",
|
|
prompt="Is the sentiment of the following text positive? '{{ input.text }}'. Return true if positive, false otherwise.",
|
|
model="gpt-4o-mini",
|
|
output={"schema": {"filtered": "boolean"}},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def resolve_config():
|
|
return ResolveOp(
|
|
name="name_email_resolver",
|
|
type="resolve",
|
|
blocking_keys=["name", "email"],
|
|
blocking_threshold=0.8,
|
|
comparison_prompt="Compare the following two entries and determine if they likely refer to the same person: Person 1: {{ input1 }} Person 2: {{ input2 }} Return true if they likely match, false otherwise.",
|
|
output={"schema": {"name": "string", "email": "string"}},
|
|
embedding_model="text-embedding-3-small",
|
|
comparison_model="gpt-4o-mini",
|
|
resolution_model="gpt-4o-mini",
|
|
resolution_prompt="Given the following list of similar entries, determine one common name and email. {{ inputs }}",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def reduce_sample_data(temp_input_file):
|
|
data = [
|
|
{"group": "A", "value": 10},
|
|
{"group": "B", "value": 20},
|
|
{"group": "A", "value": 15},
|
|
{"group": "B", "value": 25},
|
|
]
|
|
with open(temp_input_file, "w") as f:
|
|
json.dump(data, f)
|
|
return temp_input_file
|
|
|
|
|
|
@pytest.fixture
|
|
def resolve_sample_data(temp_input_file):
|
|
data = [
|
|
{"name": "John Doe"},
|
|
{"name": "Jane Smith"},
|
|
{"name": "Bob Johnson"},
|
|
]
|
|
with open(temp_input_file, "w") as f:
|
|
json.dump(data, f)
|
|
return temp_input_file
|
|
|
|
|
|
@pytest.fixture
|
|
def left_data(temp_input_file):
|
|
data = [
|
|
{"id": "1", "name": "John Doe"},
|
|
{"id": "2", "name": "Jane Smith"},
|
|
{"id": "3", "name": "Bob Johnson"},
|
|
]
|
|
with open(temp_input_file, "w") as f:
|
|
json.dump(data, f)
|
|
return temp_input_file
|
|
|
|
|
|
@pytest.fixture
|
|
def right_data(temp_input_file):
|
|
data = [
|
|
{"id": "1", "email": "john@example.com", "age": 30},
|
|
{"id": "2", "email": "jane@example.com", "age": 28},
|
|
{"id": "3", "email": "bob@example.com", "age": 35},
|
|
]
|
|
with open(temp_input_file, "w") as f:
|
|
json.dump(data, f)
|
|
return temp_input_file
|
|
|
|
|
|
def test_pipeline_creation(
|
|
map_config, reduce_config, temp_input_dataset, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": temp_input_dataset},
|
|
operations=[map_config, reduce_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="map_step", input="test_input", operations=["sentiment_analysis"]
|
|
),
|
|
PipelineStep(
|
|
name="reduce_step", input="map_step", operations=["group_summary"]
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
system_prompt={
|
|
"dataset_description": "a collection of personal information records",
|
|
"persona": "a data analyst processing and summarizing personal information",
|
|
}
|
|
)
|
|
|
|
assert isinstance(pipeline, Pipeline)
|
|
assert len(pipeline.operations) == 2
|
|
assert len(pipeline.steps) == 2
|
|
|
|
|
|
def test_pipeline_optimization(
|
|
map_config, reduce_config, temp_input_dataset, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": temp_input_dataset},
|
|
operations=[map_config, reduce_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="map_step", input="test_input", operations=["sentiment_analysis"]
|
|
),
|
|
PipelineStep(
|
|
name="reduce_step", input="map_step", operations=["group_summary"]
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
optimizer_config={
|
|
"rewrite_agent_model": "gpt-4o",
|
|
"judge_agent_model": "gpt-4o-mini",
|
|
},
|
|
)
|
|
|
|
optimized_pipeline = pipeline.optimize(
|
|
max_threads=64
|
|
)
|
|
|
|
assert isinstance(optimized_pipeline, Pipeline)
|
|
assert len(optimized_pipeline.operations) == len(pipeline.operations) + 1
|
|
assert len(optimized_pipeline.steps) == len(pipeline.steps)
|
|
|
|
|
|
def test_pipeline_execution(
|
|
map_config, temp_input_dataset, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": temp_input_dataset},
|
|
operations=[map_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="map_step", input="test_input", operations=["sentiment_analysis"]
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|
|
|
|
|
|
# -----------------------------
|
|
# Code operations (SDK) tests
|
|
# -----------------------------
|
|
|
|
def _code_map_transform(doc: dict) -> dict:
|
|
x = doc.get("x", 0)
|
|
return {"double": x * 2}
|
|
|
|
|
|
def _code_filter_transform(doc: dict) -> bool:
|
|
return bool(doc.get("keep", False))
|
|
|
|
|
|
def _code_reduce_transform(group: list[dict]) -> dict:
|
|
total = sum(item.get("value", 0) for item in group)
|
|
return {"group_total": total}
|
|
|
|
|
|
@pytest.fixture
|
|
def code_input_file():
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
|
|
json.dump(
|
|
[
|
|
{"x": 1},
|
|
{"x": 2},
|
|
{"x": 3},
|
|
],
|
|
tmp,
|
|
)
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def code_filter_input_file():
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
|
|
json.dump(
|
|
[
|
|
{"id": 1, "keep": True},
|
|
{"id": 2, "keep": False},
|
|
{"id": 3, "keep": True},
|
|
],
|
|
tmp,
|
|
)
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def code_reduce_input_file():
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
|
|
json.dump(
|
|
[
|
|
{"group": "A", "value": 10},
|
|
{"group": "A", "value": 5},
|
|
{"group": "B", "value": 7},
|
|
],
|
|
tmp,
|
|
)
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
def test_code_map_pipeline_callable(code_input_file, temp_output_file, temp_intermediate_dir):
|
|
pipeline = Pipeline(
|
|
name="test_code_map",
|
|
datasets={"input": Dataset(type="file", path=code_input_file)},
|
|
operations=[
|
|
CodeMapOp(
|
|
name="double_x",
|
|
type="code_map",
|
|
code=_code_map_transform,
|
|
)
|
|
],
|
|
steps=[
|
|
PipelineStep(name="s1", input="input", operations=["double_x"]),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
assert isinstance(cost, float)
|
|
|
|
with open(temp_output_file, "r") as f:
|
|
data = json.load(f)
|
|
assert len(data) == 3
|
|
assert data[0]["double"] == 2
|
|
assert data[1]["double"] == 4
|
|
assert data[2]["double"] == 6
|
|
|
|
|
|
def test_code_filter_pipeline_callable(code_filter_input_file, temp_output_file, temp_intermediate_dir):
|
|
pipeline = Pipeline(
|
|
name="test_code_filter",
|
|
datasets={"input": Dataset(type="file", path=code_filter_input_file)},
|
|
operations=[
|
|
CodeFilterOp(
|
|
name="keep_true",
|
|
type="code_filter",
|
|
code=_code_filter_transform,
|
|
)
|
|
],
|
|
steps=[
|
|
PipelineStep(name="s1", input="input", operations=["keep_true"]),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
assert isinstance(cost, float)
|
|
|
|
with open(temp_output_file, "r") as f:
|
|
data = json.load(f)
|
|
# Only ids 1 and 3 should be kept
|
|
kept_ids = sorted([d["id"] for d in data])
|
|
assert kept_ids == [1, 3]
|
|
|
|
|
|
def test_code_reduce_pipeline_callable(code_reduce_input_file, temp_output_file, temp_intermediate_dir):
|
|
pipeline = Pipeline(
|
|
name="test_code_reduce",
|
|
datasets={"input": Dataset(type="file", path=code_reduce_input_file)},
|
|
operations=[
|
|
CodeReduceOp(
|
|
name="sum_by_group",
|
|
type="code_reduce",
|
|
code=_code_reduce_transform,
|
|
reduce_key="group",
|
|
pass_through=True,
|
|
)
|
|
],
|
|
steps=[
|
|
PipelineStep(name="s1", input="input", operations=["sum_by_group"]),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
assert isinstance(cost, float)
|
|
|
|
with open(temp_output_file, "r") as f:
|
|
data = json.load(f)
|
|
# Expect one row per group
|
|
assert len(data) == 2
|
|
# Create a map from group -> total
|
|
totals = {d["group"]: d["group_total"] for d in data}
|
|
assert totals["A"] == 15
|
|
assert totals["B"] == 7
|
|
|
|
|
|
def test_extractop_is_exported():
|
|
# Ensure ExtractOp is importable and constructible from API schemas
|
|
op = ExtractOp(
|
|
name="extract_sections",
|
|
type="extract",
|
|
document_keys=["content"],
|
|
prompt="Extract important parts from {{ input.content }}",
|
|
extraction_method="line_number",
|
|
)
|
|
assert op.type == "extract"
|
|
|
|
def test_parallel_map_pipeline(
|
|
parallel_map_config, temp_input_file, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=temp_input_file)},
|
|
operations=[parallel_map_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="parallel_map_step",
|
|
input="test_input",
|
|
operations=["sentiment_and_word_count"],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|
|
|
|
|
|
def test_filter_pipeline(
|
|
filter_config, temp_input_file, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=temp_input_file)},
|
|
operations=[filter_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="filter_step",
|
|
input="test_input",
|
|
operations=["positive_sentiment_filter"],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|
|
|
|
|
|
def test_reduce_pipeline(
|
|
reduce_config, reduce_sample_data, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=reduce_sample_data)},
|
|
operations=[reduce_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="reduce_step", input="test_input", operations=["group_summary"]
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|
|
|
|
|
|
def test_resolve_pipeline(
|
|
resolve_config, resolve_sample_data, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=resolve_sample_data)},
|
|
operations=[resolve_config],
|
|
steps=[
|
|
PipelineStep(
|
|
name="resolve_step",
|
|
input="test_input",
|
|
operations=["name_email_resolver"],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|
|
|
|
|
|
def test_equijoin_pipeline(
|
|
left_data, right_data, temp_output_file, temp_intermediate_dir
|
|
):
|
|
pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={
|
|
"left": Dataset(type="file", path=left_data),
|
|
"right": Dataset(type="file", path=right_data),
|
|
},
|
|
operations=[
|
|
EquijoinOp(
|
|
name="user_data_join",
|
|
type="equijoin",
|
|
left="left",
|
|
right="right",
|
|
comparison_prompt="Compare the following two entries and determine if they are the same id: Left: {{ left.id }} Right: {{ right.id }}",
|
|
embedding_model="text-embedding-3-small",
|
|
comparison_model="gpt-4o-mini",
|
|
)
|
|
],
|
|
steps=[
|
|
PipelineStep(
|
|
name="equijoin_step",
|
|
operations=[
|
|
{
|
|
"user_data_join": {
|
|
"left": "left",
|
|
"right": "right",
|
|
}
|
|
}
|
|
],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
cost = pipeline.run(max_threads=4)
|
|
|
|
assert isinstance(cost, float)
|