176 lines
5.5 KiB
Python
176 lines
5.5 KiB
Python
import shutil
|
|
import time
|
|
import pytest
|
|
import json
|
|
import tempfile
|
|
import os
|
|
from docetl.api import Pipeline, Dataset, MapOp, PipelineStep, PipelineOutput
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_input_file():
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
|
|
json.dump(
|
|
[
|
|
{"text": "This is a test sentence."},
|
|
{"text": "Another test sentence."},
|
|
],
|
|
tmp,
|
|
)
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_output_file():
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp:
|
|
pass
|
|
yield tmp.name
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_intermediate_dir():
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
yield tmpdirname
|
|
|
|
|
|
def create_pipeline(input_file, output_file, intermediate_dir, operation_prompt, bypass_cache=False):
|
|
return Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=input_file)},
|
|
operations=[
|
|
MapOp(
|
|
name="test_operation",
|
|
type="map",
|
|
prompt=operation_prompt,
|
|
output={"schema": {"result": "string"}},
|
|
bypass_cache=bypass_cache,
|
|
)
|
|
],
|
|
steps=[
|
|
PipelineStep(
|
|
name="test_step", input="test_input", operations=["test_operation"]
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=output_file, intermediate_dir=intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
|
|
# def test_pipeline_rerun_on_operation_change(
|
|
# temp_input_file, temp_output_file, temp_intermediate_dir
|
|
# ):
|
|
# # Initial run
|
|
# initial_prompt = "Analyze the sentiment of the following text: '{{ input.text }}'"
|
|
# pipeline = create_pipeline(
|
|
# temp_input_file, temp_output_file, temp_intermediate_dir, initial_prompt
|
|
# )
|
|
# initial_cost = pipeline.run()
|
|
|
|
# # Check that intermediate files were created
|
|
# assert os.path.exists(
|
|
# os.path.join(temp_intermediate_dir, "test_step", "test_operation.json")
|
|
# )
|
|
|
|
# # Run without modifying the operation
|
|
# unmodified_cost = pipeline.run()
|
|
|
|
# # Check that the pipeline was not rerun (cost should be zero)
|
|
# assert unmodified_cost == 0
|
|
|
|
|
|
# # Modify the operation
|
|
# modified_prompt = "Count the words in the following text: '{{ input.text }}'"
|
|
# modified_pipeline = create_pipeline(
|
|
# temp_input_file, temp_output_file, temp_intermediate_dir, modified_prompt, bypass_cache=True
|
|
# )
|
|
|
|
# modified_cost = modified_pipeline.run()
|
|
|
|
|
|
|
|
# # Check that the intermediate files were updated
|
|
# with open(
|
|
# os.path.join(temp_intermediate_dir, "test_step", "test_operation.json"), "r"
|
|
# ) as f:
|
|
# intermediate_data = json.load(f)
|
|
# assert any("word" in str(item).lower() for item in intermediate_data)
|
|
|
|
# # Check that the cost > 0
|
|
# assert modified_cost > 0
|
|
|
|
|
|
# Test with an incorrect later operation but correct earlier operation
|
|
def test_partial_caching(temp_input_file, temp_output_file, temp_intermediate_dir):
|
|
# Create initial pipeline with two operations
|
|
initial_pipeline = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=temp_input_file)},
|
|
operations=[
|
|
MapOp(
|
|
name="first_operation",
|
|
type="map",
|
|
prompt="Analyze the sentiment of the following text: '{{ input.text }}'",
|
|
output={"schema": {"sentiment": "string"}},
|
|
model="gpt-4o-mini",
|
|
),
|
|
MapOp(
|
|
name="second_operation_bad",
|
|
type="map",
|
|
prompt="Summarize the following text: '{{ forororororo }}'",
|
|
output={"schema": {"summary": "1000"}},
|
|
model="gpt-4o-mini",
|
|
),
|
|
],
|
|
steps=[
|
|
PipelineStep(
|
|
name="first_step",
|
|
input="test_input",
|
|
operations=["first_operation", "second_operation_bad"],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
|
|
# Run the initial pipeline
|
|
# Run the initial pipeline with an expected error
|
|
with pytest.raises(Exception):
|
|
initial_cost = initial_pipeline.run()
|
|
|
|
new_pipeline_with_only_one_op = Pipeline(
|
|
name="test_pipeline",
|
|
datasets={"test_input": Dataset(type="file", path=temp_input_file)},
|
|
operations=[
|
|
MapOp(
|
|
name="first_operation",
|
|
type="map",
|
|
prompt="Analyze the sentiment of the following text: '{{ input.text }}'",
|
|
output={"schema": {"sentiment": "string"}},
|
|
model="gpt-4o-mini",
|
|
),
|
|
],
|
|
steps=[
|
|
PipelineStep(
|
|
name="first_step",
|
|
input="test_input",
|
|
operations=["first_operation"],
|
|
),
|
|
],
|
|
output=PipelineOutput(
|
|
type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir
|
|
),
|
|
default_model="gpt-4o-mini",
|
|
)
|
|
rerun_cost = new_pipeline_with_only_one_op.run()
|
|
|
|
# Assert that the cost was 0 when rerunning the pipeline
|
|
assert (
|
|
rerun_cost == 0
|
|
), "Expected zero cost when rerunning the pipeline without changes"
|