156 lines
5.2 KiB
Python
156 lines
5.2 KiB
Python
import operator
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from content_core import extract_content
|
|
from content_core.common import ProcessSourceState
|
|
from langchain_core.runnables import RunnableConfig
|
|
from langgraph.graph import END, START, StateGraph
|
|
from langgraph.types import Send
|
|
from loguru import logger
|
|
from typing_extensions import Annotated, TypedDict
|
|
|
|
from open_notebook.ai.models import Model, ModelManager
|
|
from open_notebook.domain.content_settings import ContentSettings
|
|
from open_notebook.domain.notebook import Asset, Source
|
|
from open_notebook.domain.transformation import Transformation
|
|
from open_notebook.graphs.transformation import graph as transform_graph
|
|
|
|
|
|
class SourceState(TypedDict):
|
|
content_state: ProcessSourceState
|
|
apply_transformations: List[Transformation]
|
|
source_id: str
|
|
notebook_ids: List[str]
|
|
source: Source
|
|
transformation: Annotated[list, operator.add]
|
|
embed: bool
|
|
|
|
|
|
class TransformationState(TypedDict):
|
|
source: Source
|
|
transformation: Transformation
|
|
|
|
|
|
async def content_process(state: SourceState) -> dict:
|
|
content_settings = ContentSettings(
|
|
default_content_processing_engine_doc="auto",
|
|
default_content_processing_engine_url="auto",
|
|
default_embedding_option="ask",
|
|
auto_delete_files="yes",
|
|
youtube_preferred_languages=["en", "pt", "es", "de", "nl", "en-GB", "fr", "hi", "ja"]
|
|
)
|
|
content_state: Dict[str, Any] = state["content_state"] # type: ignore[assignment]
|
|
|
|
content_state["url_engine"] = (
|
|
content_settings.default_content_processing_engine_url or "auto"
|
|
)
|
|
content_state["document_engine"] = (
|
|
content_settings.default_content_processing_engine_doc or "auto"
|
|
)
|
|
content_state["output_format"] = "markdown"
|
|
|
|
# Add speech-to-text model configuration from Default Models
|
|
try:
|
|
model_manager = ModelManager()
|
|
defaults = await model_manager.get_defaults()
|
|
if defaults.default_speech_to_text_model:
|
|
stt_model = await Model.get(defaults.default_speech_to_text_model)
|
|
if stt_model:
|
|
content_state["audio_provider"] = stt_model.provider
|
|
content_state["audio_model"] = stt_model.name
|
|
logger.debug(f"Using speech-to-text model: {stt_model.provider}/{stt_model.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to retrieve speech-to-text model configuration: {e}")
|
|
# Continue without custom audio model (content-core will use its default)
|
|
|
|
processed_state = await extract_content(content_state)
|
|
return {"content_state": processed_state}
|
|
|
|
|
|
async def save_source(state: SourceState) -> dict:
|
|
content_state = state["content_state"]
|
|
|
|
# Get existing source using the provided source_id
|
|
source = await Source.get(state["source_id"])
|
|
if not source:
|
|
raise ValueError(f"Source with ID {state['source_id']} not found")
|
|
|
|
# Update the source with processed content
|
|
source.asset = Asset(url=content_state.url, file_path=content_state.file_path)
|
|
source.full_text = content_state.content
|
|
|
|
# Preserve existing title if none provided in processed content
|
|
if content_state.title:
|
|
source.title = content_state.title
|
|
|
|
await source.save()
|
|
|
|
# NOTE: Notebook associations are created by the API immediately for UI responsiveness
|
|
# No need to create them here to avoid duplicate edges
|
|
|
|
if state["embed"]:
|
|
logger.debug("Embedding content for vector search")
|
|
await source.vectorize()
|
|
|
|
return {"source": source}
|
|
|
|
|
|
def trigger_transformations(state: SourceState, config: RunnableConfig) -> List[Send]:
|
|
if len(state["apply_transformations"]) == 0:
|
|
return []
|
|
|
|
to_apply = state["apply_transformations"]
|
|
logger.debug(f"Applying transformations {to_apply}")
|
|
|
|
return [
|
|
Send(
|
|
"transform_content",
|
|
{
|
|
"source": state["source"],
|
|
"transformation": t,
|
|
},
|
|
)
|
|
for t in to_apply
|
|
]
|
|
|
|
|
|
async def transform_content(state: TransformationState) -> Optional[dict]:
|
|
source = state["source"]
|
|
content = source.full_text
|
|
if not content:
|
|
return None
|
|
transformation: Transformation = state["transformation"]
|
|
|
|
logger.debug(f"Applying transformation {transformation.name}")
|
|
result = await transform_graph.ainvoke(
|
|
dict(input_text=content, transformation=transformation) # type: ignore[arg-type]
|
|
)
|
|
await source.add_insight(transformation.title, result["output"])
|
|
return {
|
|
"transformation": [
|
|
{
|
|
"output": result["output"],
|
|
"transformation_name": transformation.name,
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
# Create and compile the workflow
|
|
workflow = StateGraph(SourceState)
|
|
|
|
# Add nodes
|
|
workflow.add_node("content_process", content_process)
|
|
workflow.add_node("save_source", save_source)
|
|
workflow.add_node("transform_content", transform_content)
|
|
# Define the graph edges
|
|
workflow.add_edge(START, "content_process")
|
|
workflow.add_edge("content_process", "save_source")
|
|
workflow.add_conditional_edges(
|
|
"save_source", trigger_transformations, ["transform_content"]
|
|
)
|
|
workflow.add_edge("transform_content", END)
|
|
|
|
# Compile the graph
|
|
source_graph = workflow.compile()
|