474 lines
19 KiB
Python
474 lines
19 KiB
Python
"""
|
|
API client for Open Notebook API.
|
|
This module provides a client interface to interact with the Open Notebook API.
|
|
"""
|
|
|
|
import os
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
|
|
|
|
class APIClient:
|
|
"""Client for Open Notebook API."""
|
|
|
|
def __init__(self, base_url: Optional[str] = None):
|
|
self.base_url = base_url or os.getenv("API_BASE_URL", "http://127.0.0.1:5055")
|
|
# Timeout increased to 5 minutes (300s) to accommodate slow LLM operations
|
|
# (transformations, insights) on slower hardware (Ollama, LM Studio, remote APIs)
|
|
# Configurable via API_CLIENT_TIMEOUT environment variable (in seconds)
|
|
timeout_str = os.getenv("API_CLIENT_TIMEOUT", "300.0")
|
|
try:
|
|
timeout_value = float(timeout_str)
|
|
# Validate timeout is within reasonable bounds (30s - 3600s / 1 hour)
|
|
if timeout_value < 30:
|
|
logger.warning(f"API_CLIENT_TIMEOUT={timeout_value}s is too low, using minimum of 30s")
|
|
timeout_value = 30.0
|
|
elif timeout_value > 3600:
|
|
logger.warning(f"API_CLIENT_TIMEOUT={timeout_value}s is too high, using maximum of 3600s")
|
|
timeout_value = 3600.0
|
|
self.timeout = timeout_value
|
|
except ValueError:
|
|
logger.error(f"Invalid API_CLIENT_TIMEOUT value '{timeout_str}', using default 300s")
|
|
self.timeout = 300.0
|
|
|
|
# Add authentication header if password is set
|
|
self.headers = {}
|
|
password = os.getenv("OPEN_NOTEBOOK_PASSWORD")
|
|
if password:
|
|
self.headers["Authorization"] = f"Bearer {password}"
|
|
|
|
def _make_request(
|
|
self, method: str, endpoint: str, timeout: Optional[float] = None, **kwargs
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Make HTTP request to the API."""
|
|
url = f"{self.base_url}{endpoint}"
|
|
request_timeout = timeout if timeout is not None else self.timeout
|
|
|
|
# Merge headers
|
|
headers = kwargs.get("headers", {})
|
|
headers.update(self.headers)
|
|
kwargs["headers"] = headers
|
|
|
|
try:
|
|
with httpx.Client(timeout=request_timeout) as client:
|
|
response = client.request(method, url, **kwargs)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Request error for {method} {url}: {str(e)}")
|
|
raise ConnectionError(f"Failed to connect to API: {str(e)}")
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(
|
|
f"HTTP error {e.response.status_code} for {method} {url}: {e.response.text}"
|
|
)
|
|
raise RuntimeError(
|
|
f"API request failed: {e.response.status_code} - {e.response.text}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error for {method} {url}: {str(e)}")
|
|
raise
|
|
|
|
# Notebooks API methods
|
|
def get_notebooks(
|
|
self, archived: Optional[bool] = None, order_by: str = "updated desc"
|
|
) -> List[Dict[Any, Any]]:
|
|
"""Get all notebooks."""
|
|
params: Dict[str, Any] = {"order_by": order_by}
|
|
if archived is not None:
|
|
params["archived"] = str(archived).lower()
|
|
|
|
result = self._make_request("GET", "/api/notebooks", params=params)
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def create_notebook(self, name: str, description: str = "") -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new notebook."""
|
|
data = {"name": name, "description": description}
|
|
return self._make_request("POST", "/api/notebooks", json=data)
|
|
|
|
def get_notebook(self, notebook_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific notebook."""
|
|
return self._make_request("GET", f"/api/notebooks/{notebook_id}")
|
|
|
|
def update_notebook(self, notebook_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update a notebook."""
|
|
return self._make_request("PUT", f"/api/notebooks/{notebook_id}", json=updates)
|
|
|
|
def delete_notebook(self, notebook_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a notebook."""
|
|
return self._make_request("DELETE", f"/api/notebooks/{notebook_id}")
|
|
|
|
# Search API methods
|
|
def search(
|
|
self,
|
|
query: str,
|
|
search_type: str = "text",
|
|
limit: int = 100,
|
|
search_sources: bool = True,
|
|
search_notes: bool = True,
|
|
minimum_score: float = 0.2,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Search the knowledge base."""
|
|
data = {
|
|
"query": query,
|
|
"type": search_type,
|
|
"limit": limit,
|
|
"search_sources": search_sources,
|
|
"search_notes": search_notes,
|
|
"minimum_score": minimum_score,
|
|
}
|
|
return self._make_request("POST", "/api/search", json=data)
|
|
|
|
def ask_simple(
|
|
self,
|
|
question: str,
|
|
strategy_model: str,
|
|
answer_model: str,
|
|
final_answer_model: str,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Ask the knowledge base a question (simple, non-streaming)."""
|
|
data = {
|
|
"question": question,
|
|
"strategy_model": strategy_model,
|
|
"answer_model": answer_model,
|
|
"final_answer_model": final_answer_model,
|
|
}
|
|
# Use configured timeout for long-running ask operations
|
|
return self._make_request(
|
|
"POST", "/api/search/ask/simple", json=data, timeout=self.timeout
|
|
)
|
|
|
|
# Models API methods
|
|
def get_models(self, model_type: Optional[str] = None) -> List[Dict[Any, Any]]:
|
|
"""Get all models with optional type filtering."""
|
|
params = {}
|
|
if model_type:
|
|
params["type"] = model_type
|
|
result = self._make_request("GET", "/api/models", params=params)
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def create_model(self, name: str, provider: str, model_type: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new model."""
|
|
data = {
|
|
"name": name,
|
|
"provider": provider,
|
|
"type": model_type,
|
|
}
|
|
return self._make_request("POST", "/api/models", json=data)
|
|
|
|
def delete_model(self, model_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a model."""
|
|
return self._make_request("DELETE", f"/api/models/{model_id}")
|
|
|
|
def get_default_models(self) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get default model assignments."""
|
|
return self._make_request("GET", "/api/models/defaults")
|
|
|
|
def update_default_models(self, **defaults) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update default model assignments."""
|
|
return self._make_request("PUT", "/api/models/defaults", json=defaults)
|
|
|
|
# Transformations API methods
|
|
def get_transformations(self) -> List[Dict[Any, Any]]:
|
|
"""Get all transformations."""
|
|
result = self._make_request("GET", "/api/transformations")
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def create_transformation(
|
|
self,
|
|
name: str,
|
|
title: str,
|
|
description: str,
|
|
prompt: str,
|
|
apply_default: bool = False,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new transformation."""
|
|
data = {
|
|
"name": name,
|
|
"title": title,
|
|
"description": description,
|
|
"prompt": prompt,
|
|
"apply_default": apply_default,
|
|
}
|
|
return self._make_request("POST", "/api/transformations", json=data)
|
|
|
|
def get_transformation(self, transformation_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific transformation."""
|
|
return self._make_request("GET", f"/api/transformations/{transformation_id}")
|
|
|
|
def update_transformation(self, transformation_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update a transformation."""
|
|
return self._make_request(
|
|
"PUT", f"/api/transformations/{transformation_id}", json=updates
|
|
)
|
|
|
|
def delete_transformation(self, transformation_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a transformation."""
|
|
return self._make_request("DELETE", f"/api/transformations/{transformation_id}")
|
|
|
|
def execute_transformation(
|
|
self, transformation_id: str, input_text: str, model_id: str
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Execute a transformation on input text."""
|
|
data = {
|
|
"transformation_id": transformation_id,
|
|
"input_text": input_text,
|
|
"model_id": model_id,
|
|
}
|
|
# Use configured timeout for transformation operations
|
|
return self._make_request(
|
|
"POST", "/api/transformations/execute", json=data, timeout=self.timeout
|
|
)
|
|
|
|
# Notes API methods
|
|
def get_notes(self, notebook_id: Optional[str] = None) -> List[Dict[Any, Any]]:
|
|
"""Get all notes with optional notebook filtering."""
|
|
params = {}
|
|
if notebook_id:
|
|
params["notebook_id"] = notebook_id
|
|
result = self._make_request("GET", "/api/notes", params=params)
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def create_note(
|
|
self,
|
|
content: str,
|
|
title: Optional[str] = None,
|
|
note_type: str = "human",
|
|
notebook_id: Optional[str] = None,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new note."""
|
|
data = {
|
|
"content": content,
|
|
"note_type": note_type,
|
|
}
|
|
if title:
|
|
data["title"] = title
|
|
if notebook_id:
|
|
data["notebook_id"] = notebook_id
|
|
return self._make_request("POST", "/api/notes", json=data)
|
|
|
|
def get_note(self, note_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific note."""
|
|
return self._make_request("GET", f"/api/notes/{note_id}")
|
|
|
|
def update_note(self, note_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update a note."""
|
|
return self._make_request("PUT", f"/api/notes/{note_id}", json=updates)
|
|
|
|
def delete_note(self, note_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a note."""
|
|
return self._make_request("DELETE", f"/api/notes/{note_id}")
|
|
|
|
# Embedding API methods
|
|
def embed_content(self, item_id: str, item_type: str, async_processing: bool = False) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Embed content for vector search."""
|
|
data = {
|
|
"item_id": item_id,
|
|
"item_type": item_type,
|
|
"async_processing": async_processing,
|
|
}
|
|
# Use configured timeout for embedding operations
|
|
return self._make_request("POST", "/api/embed", json=data, timeout=self.timeout)
|
|
|
|
def rebuild_embeddings(
|
|
self,
|
|
mode: str = "existing",
|
|
include_sources: bool = True,
|
|
include_notes: bool = True,
|
|
include_insights: bool = True
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Rebuild embeddings in bulk.
|
|
|
|
Note: This operation can take a long time for large databases.
|
|
Consider increasing API_CLIENT_TIMEOUT to 600-900s for bulk rebuilds.
|
|
"""
|
|
data = {
|
|
"mode": mode,
|
|
"include_sources": include_sources,
|
|
"include_notes": include_notes,
|
|
"include_insights": include_insights,
|
|
}
|
|
# Use double the configured timeout for bulk rebuild operations (or configured value if already high)
|
|
rebuild_timeout = max(self.timeout, min(self.timeout * 2, 3600.0))
|
|
return self._make_request("POST", "/api/embeddings/rebuild", json=data, timeout=rebuild_timeout)
|
|
|
|
def get_rebuild_status(self, command_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get status of a rebuild operation."""
|
|
return self._make_request("GET", f"/api/embeddings/rebuild/{command_id}/status")
|
|
|
|
# Settings API methods
|
|
def get_settings(self) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get all application settings."""
|
|
return self._make_request("GET", "/api/settings")
|
|
|
|
def update_settings(self, **settings) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update application settings."""
|
|
return self._make_request("PUT", "/api/settings", json=settings)
|
|
|
|
# Context API methods
|
|
def get_notebook_context(
|
|
self, notebook_id: str, context_config: Optional[Dict] = None
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get context for a notebook."""
|
|
data: Dict[str, Any] = {"notebook_id": notebook_id}
|
|
if context_config:
|
|
data["context_config"] = context_config
|
|
result = self._make_request(
|
|
"POST", f"/api/notebooks/{notebook_id}/context", json=data
|
|
)
|
|
return result if isinstance(result, dict) else {}
|
|
|
|
# Sources API methods
|
|
def get_sources(self, notebook_id: Optional[str] = None) -> List[Dict[Any, Any]]:
|
|
"""Get all sources with optional notebook filtering."""
|
|
params = {}
|
|
if notebook_id:
|
|
params["notebook_id"] = notebook_id
|
|
result = self._make_request("GET", "/api/sources", params=params)
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def create_source(
|
|
self,
|
|
notebook_id: Optional[str] = None,
|
|
notebooks: Optional[List[str]] = None,
|
|
source_type: str = "text",
|
|
url: Optional[str] = None,
|
|
file_path: Optional[str] = None,
|
|
content: Optional[str] = None,
|
|
title: Optional[str] = None,
|
|
transformations: Optional[List[str]] = None,
|
|
embed: bool = False,
|
|
delete_source: bool = False,
|
|
async_processing: bool = False,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new source."""
|
|
data = {
|
|
"type": source_type,
|
|
"embed": embed,
|
|
"delete_source": delete_source,
|
|
"async_processing": async_processing,
|
|
}
|
|
|
|
# Handle backward compatibility for notebook_id vs notebooks
|
|
if notebooks:
|
|
data["notebooks"] = notebooks
|
|
elif notebook_id:
|
|
data["notebook_id"] = notebook_id
|
|
else:
|
|
raise ValueError("Either notebook_id or notebooks must be provided")
|
|
|
|
if url:
|
|
data["url"] = url
|
|
if file_path:
|
|
data["file_path"] = file_path
|
|
if content:
|
|
data["content"] = content
|
|
if title:
|
|
data["title"] = title
|
|
if transformations:
|
|
data["transformations"] = transformations
|
|
|
|
# Use configured timeout for source creation (especially PDF processing with OCR)
|
|
return self._make_request("POST", "/api/sources/json", json=data, timeout=self.timeout)
|
|
|
|
def get_source(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific source."""
|
|
return self._make_request("GET", f"/api/sources/{source_id}")
|
|
|
|
def get_source_status(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get processing status for a source."""
|
|
return self._make_request("GET", f"/api/sources/{source_id}/status")
|
|
|
|
def update_source(self, source_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update a source."""
|
|
return self._make_request("PUT", f"/api/sources/{source_id}", json=updates)
|
|
|
|
def delete_source(self, source_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a source."""
|
|
return self._make_request("DELETE", f"/api/sources/{source_id}")
|
|
|
|
# Insights API methods
|
|
def get_source_insights(self, source_id: str) -> List[Dict[Any, Any]]:
|
|
"""Get all insights for a specific source."""
|
|
result = self._make_request("GET", f"/api/sources/{source_id}/insights")
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def get_insight(self, insight_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific insight."""
|
|
return self._make_request("GET", f"/api/insights/{insight_id}")
|
|
|
|
def delete_insight(self, insight_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete a specific insight."""
|
|
return self._make_request("DELETE", f"/api/insights/{insight_id}")
|
|
|
|
def save_insight_as_note(
|
|
self, insight_id: str, notebook_id: Optional[str] = None
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Convert an insight to a note."""
|
|
data = {}
|
|
if notebook_id:
|
|
data["notebook_id"] = notebook_id
|
|
return self._make_request(
|
|
"POST", f"/api/insights/{insight_id}/save-as-note", json=data
|
|
)
|
|
|
|
def create_source_insight(
|
|
self, source_id: str, transformation_id: str, model_id: Optional[str] = None
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new insight for a source by running a transformation."""
|
|
data = {"transformation_id": transformation_id}
|
|
if model_id:
|
|
data["model_id"] = model_id
|
|
return self._make_request(
|
|
"POST", f"/api/sources/{source_id}/insights", json=data
|
|
)
|
|
|
|
# Episode Profiles API methods
|
|
def get_episode_profiles(self) -> List[Dict[Any, Any]]:
|
|
"""Get all episode profiles."""
|
|
result = self._make_request("GET", "/api/episode-profiles")
|
|
return result if isinstance(result, list) else [result]
|
|
|
|
def get_episode_profile(self, profile_name: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Get a specific episode profile by name."""
|
|
return self._make_request("GET", f"/api/episode-profiles/{profile_name}")
|
|
|
|
def create_episode_profile(
|
|
self,
|
|
name: str,
|
|
description: str = "",
|
|
speaker_config: str = "",
|
|
outline_provider: str = "",
|
|
outline_model: str = "",
|
|
transcript_provider: str = "",
|
|
transcript_model: str = "",
|
|
default_briefing: str = "",
|
|
num_segments: int = 5,
|
|
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Create a new episode profile."""
|
|
data = {
|
|
"name": name,
|
|
"description": description,
|
|
"speaker_config": speaker_config,
|
|
"outline_provider": outline_provider,
|
|
"outline_model": outline_model,
|
|
"transcript_provider": transcript_provider,
|
|
"transcript_model": transcript_model,
|
|
"default_briefing": default_briefing,
|
|
"num_segments": num_segments,
|
|
}
|
|
return self._make_request("POST", "/api/episode-profiles", json=data)
|
|
|
|
def update_episode_profile(self, profile_id: str, **updates) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Update an episode profile."""
|
|
return self._make_request("PUT", f"/api/episode-profiles/{profile_id}", json=updates)
|
|
|
|
def delete_episode_profile(self, profile_id: str) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
|
|
"""Delete an episode profile."""
|
|
return self._make_request("DELETE", f"/api/episode-profiles/{profile_id}")
|
|
|
|
|
|
# Global client instance
|
|
api_client = APIClient()
|