open-webui/backend/open_webui/utils/middleware.py

2286 lines
91 KiB
Python
Raw Normal View History

2024-12-13 14:28:42 +08:00
import time
import logging
import sys
2025-02-05 10:33:22 +08:00
import os
import base64
2024-12-13 14:28:42 +08:00
2024-12-19 17:00:32 +08:00
import asyncio
2024-12-13 14:28:42 +08:00
from aiocache import cached
from typing import Any, Optional
import random
import json
2025-02-03 16:03:41 +08:00
import html
2024-12-13 14:28:42 +08:00
import inspect
2025-02-03 12:50:54 +08:00
import re
2025-02-05 16:01:24 +08:00
import ast
2025-02-03 12:50:54 +08:00
2024-12-19 17:00:32 +08:00
from uuid import uuid4
2024-12-25 08:56:46 +08:00
from concurrent.futures import ThreadPoolExecutor
2024-12-19 17:00:32 +08:00
2024-12-13 14:28:42 +08:00
2025-03-28 15:52:13 +08:00
from fastapi import Request, HTTPException
2024-12-13 14:28:42 +08:00
from starlette.responses import Response, StreamingResponse
2024-12-19 17:00:32 +08:00
from open_webui.models.chats import Chats
2024-12-21 14:54:43 +08:00
from open_webui.models.users import Users
2024-12-13 14:28:42 +08:00
from open_webui.socket.main import (
get_event_call,
get_event_emitter,
2024-12-31 07:39:35 +08:00
get_active_status_by_user_id,
2024-12-13 14:28:42 +08:00
)
2024-12-19 17:00:32 +08:00
from open_webui.routers.tasks import (
generate_queries,
generate_title,
2025-01-16 16:06:37 +08:00
generate_image_prompt,
2024-12-19 17:00:32 +08:00
generate_chat_tags,
)
2024-12-25 08:52:57 +08:00
from open_webui.routers.retrieval import process_web_search, SearchForm
2025-01-16 15:32:13 +08:00
from open_webui.routers.images import image_generations, GenerateImageForm
2025-02-16 14:25:18 +08:00
from open_webui.routers.pipelines import (
process_pipeline_inlet_filter,
process_pipeline_outlet_filter,
)
2025-01-16 15:32:13 +08:00
2024-12-21 14:54:43 +08:00
from open_webui.utils.webhook import post_webhook
2024-12-13 14:28:42 +08:00
from open_webui.models.users import UserModel
from open_webui.models.functions import Functions
from open_webui.models.models import Models
from open_webui.retrieval.utils import get_sources_from_files
from open_webui.utils.chat import generate_chat_completion
from open_webui.utils.task import (
get_task_model_id,
rag_template,
tools_function_calling_generation_template,
)
from open_webui.utils.misc import (
2025-02-05 15:05:14 +08:00
deep_update,
2024-12-19 17:00:32 +08:00
get_message_list,
2024-12-13 14:28:42 +08:00
add_or_update_system_message,
2025-02-03 17:14:38 +08:00
add_or_update_user_message,
2024-12-13 14:28:42 +08:00
get_last_user_message,
2024-12-25 14:45:21 +08:00
get_last_assistant_message,
2024-12-13 14:28:42 +08:00
prepend_to_first_user_message_content,
2025-03-01 22:56:24 +08:00
convert_logit_bias_input_to_json,
2024-12-13 14:28:42 +08:00
)
from open_webui.utils.tools import get_tools
from open_webui.utils.plugin import load_function_module_by_id
from open_webui.utils.filter import (
get_sorted_filter_ids,
process_filter_functions,
)
2025-02-10 18:25:02 +08:00
from open_webui.utils.code_interpreter import execute_code_jupyter
2024-12-13 14:28:42 +08:00
2024-12-19 17:00:32 +08:00
from open_webui.tasks import create_task
2025-02-03 17:14:38 +08:00
from open_webui.config import (
2025-02-05 10:33:22 +08:00
CACHE_DIR,
2025-02-03 17:14:38 +08:00
DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE,
DEFAULT_CODE_INTERPRETER_PROMPT,
)
2024-12-21 14:54:43 +08:00
from open_webui.env import (
SRC_LOG_LEVELS,
GLOBAL_LOG_LEVEL,
BYPASS_MODEL_ACCESS_CONTROL,
2024-12-28 14:36:14 +08:00
ENABLE_REALTIME_CHAT_SAVE,
2024-12-21 14:54:43 +08:00
)
2024-12-13 14:28:42 +08:00
from open_webui.constants import TASKS
logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
async def chat_completion_tools_handler(
2025-03-26 15:40:24 +08:00
request: Request, body: dict, extra_params: dict, user: UserModel, models, tools
2024-12-13 14:28:42 +08:00
) -> tuple[dict, dict]:
async def get_content_from_response(response) -> Optional[str]:
content = None
if hasattr(response, "body_iterator"):
async for chunk in response.body_iterator:
data = json.loads(chunk.decode("utf-8"))
content = data["choices"][0]["message"]["content"]
# Cleanup any remaining background tasks if necessary
if response.background is not None:
await response.background()
else:
content = response["choices"][0]["message"]["content"]
return content
def get_tools_function_calling_payload(messages, task_model_id, content):
user_message = get_last_user_message(messages)
history = "\n".join(
f"{message['role'].upper()}: \"\"\"{message['content']}\"\"\""
for message in messages[::-1][:4]
)
prompt = f"History:\n{history}\nQuery: {user_message}"
return {
"model": task_model_id,
"messages": [
{"role": "system", "content": content},
{"role": "user", "content": f"Query: {prompt}"},
],
"stream": False,
"metadata": {"task": str(TASKS.FUNCTION_CALLING)},
}
2025-03-26 15:40:24 +08:00
event_caller = extra_params["__event_call__"]
metadata = extra_params["__metadata__"]
2024-12-13 14:28:42 +08:00
task_model_id = get_task_model_id(
body["model"],
request.app.state.config.TASK_MODEL,
request.app.state.config.TASK_MODEL_EXTERNAL,
models,
)
2025-02-05 13:01:53 +08:00
skip_files = False
sources = []
2024-12-13 14:28:42 +08:00
specs = [tool["spec"] for tool in tools.values()]
tools_specs = json.dumps(specs)
if request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE != "":
template = request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
else:
template = DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
tools_function_calling_prompt = tools_function_calling_generation_template(
template, tools_specs
)
payload = get_tools_function_calling_payload(
body["messages"], task_model_id, tools_function_calling_prompt
)
try:
response = await generate_chat_completion(request, form_data=payload, user=user)
log.debug(f"{response=}")
content = await get_content_from_response(response)
log.debug(f"{content=}")
if not content:
return body, {}
try:
content = content[content.find("{") : content.rfind("}") + 1]
if not content:
raise Exception("No JSON object found in the response")
result = json.loads(content)
2025-02-02 13:01:06 +08:00
async def tool_call_handler(tool_call):
2025-02-05 13:01:53 +08:00
nonlocal skip_files
2025-02-02 13:01:06 +08:00
log.debug(f"{tool_call=}")
2024-12-13 14:28:42 +08:00
2025-02-02 13:01:06 +08:00
tool_function_name = tool_call.get("name", None)
if tool_function_name not in tools:
return body, {}
2024-12-13 14:28:42 +08:00
2025-02-02 13:01:06 +08:00
tool_function_params = tool_call.get("parameters", {})
2024-12-13 14:28:42 +08:00
2025-02-02 13:01:06 +08:00
try:
2025-03-26 15:40:24 +08:00
tool = tools[tool_function_name]
spec = tool.get("spec", {})
2025-03-08 04:37:22 +08:00
allowed_params = (
spec.get("parameters", {}).get("properties", {}).keys()
2024-12-13 14:28:42 +08:00
)
2025-02-02 13:01:06 +08:00
tool_function_params = {
k: v
for k, v in tool_function_params.items()
2025-03-08 04:37:22 +08:00
if k in allowed_params
2025-02-02 13:01:06 +08:00
}
2025-03-26 15:40:24 +08:00
if tool.get("direct", False):
2025-03-30 15:44:09 +08:00
tool_result = await event_caller(
2025-03-26 15:40:24 +08:00
{
"type": "execute:tool",
"data": {
"id": str(uuid4()),
2025-03-27 17:27:56 +08:00
"name": tool_function_name,
2025-03-26 15:40:24 +08:00
"params": tool_function_params,
"server": tool.get("server", {}),
2025-03-26 15:40:24 +08:00
"session_id": metadata.get("session_id", None),
},
}
)
2025-03-27 17:50:53 +08:00
else:
tool_function = tool["callable"]
2025-03-30 15:44:09 +08:00
tool_result = await tool_function(**tool_function_params)
2025-02-02 13:01:06 +08:00
except Exception as e:
2025-03-30 15:44:09 +08:00
tool_result = str(e)
2025-02-02 13:01:06 +08:00
2025-04-05 19:55:56 +08:00
tool_result_files = []
if isinstance(tool_result, list):
for item in tool_result:
# check if string
if isinstance(item, str) and item.startswith("data:"):
tool_result_files.append(item)
tool_result.remove(item)
2025-03-30 15:44:09 +08:00
if isinstance(tool_result, dict) or isinstance(tool_result, list):
tool_result = json.dumps(tool_result, indent=2)
2025-03-27 17:27:56 +08:00
2025-03-30 15:44:09 +08:00
if isinstance(tool_result, str):
2025-03-27 17:50:53 +08:00
tool = tools[tool_function_name]
2025-04-05 19:03:15 +08:00
tool_id = tool.get("tool_id", "")
2025-04-13 08:26:35 +08:00
tool_name = (
f"{tool_id}/{tool_function_name}"
if tool_id
else f"{tool_function_name}"
)
2025-04-05 18:49:07 +08:00
if tool.get("metadata", {}).get("citation", False) or tool.get(
"direct", False
):
2025-04-13 08:26:35 +08:00
# Citation is enabled for this tool
2025-02-02 13:01:06 +08:00
sources.append(
{
"source": {
2025-04-13 08:26:35 +08:00
"name": (f"TOOL:{tool_name}"),
2025-02-02 13:01:06 +08:00
},
2025-04-13 08:26:35 +08:00
"document": [tool_result],
"metadata": [{"source": (f"TOOL:{tool_name}")}],
2025-02-02 13:01:06 +08:00
}
)
else:
2025-04-13 08:26:35 +08:00
# Citation is not enabled for this tool
body["messages"] = add_or_update_user_message(
f"\nTool `{tool_name}` Output: {tool_result}",
body["messages"],
2025-02-02 13:01:06 +08:00
)
2025-04-05 18:49:07 +08:00
if (
tools[tool_function_name]
.get("metadata", {})
.get("file_handler", False)
):
2025-02-02 13:01:06 +08:00
skip_files = True
2024-12-13 14:28:42 +08:00
2025-02-02 13:01:06 +08:00
# check if "tool_calls" in result
if result.get("tool_calls"):
for tool_call in result.get("tool_calls"):
await tool_call_handler(tool_call)
else:
await tool_call_handler(result)
2024-12-13 14:28:42 +08:00
except Exception as e:
2025-03-28 15:25:00 +08:00
log.debug(f"Error: {e}")
2024-12-13 14:28:42 +08:00
content = None
except Exception as e:
2025-03-28 15:25:00 +08:00
log.debug(f"Error: {e}")
2024-12-13 14:28:42 +08:00
content = None
log.debug(f"tool_contexts: {sources}")
if skip_files and "files" in body.get("metadata", {}):
del body["metadata"]["files"]
return body, {"sources": sources}
2024-12-25 08:52:57 +08:00
async def chat_web_search_handler(
request: Request, form_data: dict, extra_params: dict, user
):
event_emitter = extra_params["__event_emitter__"]
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "Generating search query",
"done": False,
},
}
)
messages = form_data["messages"]
user_message = get_last_user_message(messages)
queries = []
try:
res = await generate_queries(
request,
{
"model": form_data["model"],
"messages": messages,
"prompt": user_message,
"type": "web_search",
},
user,
)
response = res["choices"][0]["message"]["content"]
try:
bracket_start = response.find("{")
bracket_end = response.rfind("}") + 1
if bracket_start == -1 or bracket_end == -1:
raise Exception("No JSON object found in the response")
response = response[bracket_start:bracket_end]
queries = json.loads(response)
queries = queries.get("queries", [])
except Exception as e:
queries = [response]
except Exception as e:
log.exception(e)
queries = [user_message]
if len(queries) == 0:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "No search query generated",
"done": True,
},
}
)
return form_data
2024-12-25 08:52:57 +08:00
2025-02-19 13:29:27 +08:00
all_results = []
2024-12-25 08:52:57 +08:00
2025-02-19 13:29:27 +08:00
for searchQuery in queries:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": 'Searching "{{searchQuery}}"',
2025-02-14 15:05:10 +08:00
"query": searchQuery,
2025-02-19 13:29:27 +08:00
"done": False,
},
}
2025-02-14 15:05:10 +08:00
)
2024-12-25 08:52:57 +08:00
2025-02-19 13:29:27 +08:00
try:
results = await process_web_search(
request,
SearchForm(
**{
2024-12-25 08:52:57 +08:00
"query": searchQuery,
2025-02-19 13:29:27 +08:00
}
),
user=user,
2024-12-25 08:52:57 +08:00
)
2025-02-19 13:29:27 +08:00
if results:
all_results.append(results)
files = form_data.get("files", [])
if results.get("collection_names"):
2025-04-07 08:36:16 +08:00
for col_idx, collection_name in enumerate(
results.get("collection_names")
):
files.append(
{
"collection_name": collection_name,
"name": searchQuery,
"type": "web_search",
"urls": [results["filenames"][col_idx]],
}
)
2025-02-27 07:42:19 +08:00
elif results.get("docs"):
2025-04-07 06:23:40 +08:00
# Invoked when bypass embedding and retrieval is set to True
docs = results["docs"]
if len(docs) == len(results["filenames"]):
# the number of docs and filenames (urls) should be the same
for doc_idx, doc in enumerate(docs):
files.append(
{
"docs": [doc],
"name": searchQuery,
"type": "web_search",
"urls": [results["filenames"][doc_idx]],
}
)
else:
# edge case when the number of docs and filenames (urls) are not the same
# this should not happen, but if it does, we will just append the docs
files.append(
{
2025-04-07 06:23:40 +08:00
"docs": results.get("docs", []),
"name": searchQuery,
"type": "web_search",
2025-04-07 06:23:40 +08:00
"urls": results["filenames"],
}
)
2025-02-27 07:42:19 +08:00
2025-02-19 13:29:27 +08:00
form_data["files"] = files
except Exception as e:
log.exception(e)
2024-12-25 08:52:57 +08:00
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
2025-02-19 13:29:27 +08:00
"description": 'Error searching "{{searchQuery}}"',
2024-12-25 08:52:57 +08:00
"query": searchQuery,
"done": True,
"error": True,
},
}
)
2025-02-19 13:29:27 +08:00
if all_results:
urls = []
for results in all_results:
if "filenames" in results:
urls.extend(results["filenames"])
2024-12-25 08:52:57 +08:00
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
2025-02-19 13:29:27 +08:00
"description": "Searched {{count}} sites",
"urls": urls,
"done": True,
},
}
)
else:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "No search results found",
2024-12-25 08:52:57 +08:00
"done": True,
"error": True,
},
}
)
return form_data
2025-01-16 15:32:13 +08:00
async def chat_image_generation_handler(
request: Request, form_data: dict, extra_params: dict, user
):
__event_emitter__ = extra_params["__event_emitter__"]
await __event_emitter__(
{
"type": "status",
"data": {"description": "Generating an image", "done": False},
}
)
messages = form_data["messages"]
user_message = get_last_user_message(messages)
2025-01-16 16:13:02 +08:00
prompt = user_message
2025-01-16 16:06:37 +08:00
negative_prompt = ""
2025-01-16 16:13:02 +08:00
if request.app.state.config.ENABLE_IMAGE_PROMPT_GENERATION:
try:
res = await generate_image_prompt(
request,
{
"model": form_data["model"],
"messages": messages,
},
user,
)
2025-01-16 16:06:37 +08:00
2025-01-16 16:13:02 +08:00
response = res["choices"][0]["message"]["content"]
2025-01-16 16:06:37 +08:00
2025-01-16 16:13:02 +08:00
try:
bracket_start = response.find("{")
bracket_end = response.rfind("}") + 1
2025-01-16 16:06:37 +08:00
2025-01-16 16:13:02 +08:00
if bracket_start == -1 or bracket_end == -1:
raise Exception("No JSON object found in the response")
response = response[bracket_start:bracket_end]
response = json.loads(response)
prompt = response.get("prompt", [])
except Exception as e:
prompt = user_message
2025-01-16 16:06:37 +08:00
except Exception as e:
2025-01-16 16:13:02 +08:00
log.exception(e)
2025-01-16 16:06:37 +08:00
prompt = user_message
2025-01-16 15:32:13 +08:00
system_message_content = ""
try:
images = await image_generations(
request=request,
2025-01-16 16:06:37 +08:00
form_data=GenerateImageForm(**{"prompt": prompt}),
2025-01-16 15:32:13 +08:00
user=user,
)
await __event_emitter__(
{
"type": "status",
"data": {"description": "Generated an image", "done": True},
}
)
for image in images:
await __event_emitter__(
{
"type": "message",
2025-01-23 04:49:29 +08:00
"data": {"content": f"![Generated Image]({image['url']})\n"},
2025-01-16 15:32:13 +08:00
}
)
system_message_content = "<context>User is shown the generated image, tell the user that the image has been generated</context>"
except Exception as e:
log.exception(e)
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"An error occurred while generating an image",
2025-01-16 15:32:13 +08:00
"done": True,
},
}
)
system_message_content = "<context>Unable to generate an image, tell the user that an error occurred</context>"
2025-01-16 15:32:13 +08:00
if system_message_content:
form_data["messages"] = add_or_update_system_message(
system_message_content, form_data["messages"]
)
return form_data
2024-12-13 14:28:42 +08:00
async def chat_completion_files_handler(
request: Request, body: dict, user: UserModel
) -> tuple[dict, dict[str, list]]:
sources = []
if files := body.get("metadata", {}).get("files", None):
2025-02-27 07:42:19 +08:00
queries = []
2024-12-13 14:28:42 +08:00
try:
queries_response = await generate_queries(
2025-01-02 12:13:28 +08:00
request,
2024-12-13 14:28:42 +08:00
{
"model": body["model"],
"messages": body["messages"],
"type": "retrieval",
},
user,
)
queries_response = queries_response["choices"][0]["message"]["content"]
try:
bracket_start = queries_response.find("{")
bracket_end = queries_response.rfind("}") + 1
if bracket_start == -1 or bracket_end == -1:
raise Exception("No JSON object found in the response")
queries_response = queries_response[bracket_start:bracket_end]
queries_response = json.loads(queries_response)
except Exception as e:
queries_response = {"queries": [queries_response]}
queries = queries_response.get("queries", [])
2025-02-27 07:42:19 +08:00
except:
pass
2024-12-13 14:28:42 +08:00
if len(queries) == 0:
queries = [get_last_user_message(body["messages"])]
2025-01-17 03:11:23 +08:00
try:
# Offload get_sources_from_files to a separate thread
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
sources = await loop.run_in_executor(
executor,
lambda: get_sources_from_files(
2025-02-27 07:42:19 +08:00
request=request,
2025-01-17 03:11:23 +08:00
files=files,
queries=queries,
embedding_function=lambda query, prefix: request.app.state.EMBEDDING_FUNCTION(
query, prefix=prefix, user=user
2025-02-05 16:07:45 +08:00
),
2025-01-17 03:11:23 +08:00
k=request.app.state.config.TOP_K,
reranking_function=request.app.state.rf,
2025-03-06 17:47:57 +08:00
k_reranker=request.app.state.config.TOP_K_RERANKER,
2025-01-17 03:11:23 +08:00
r=request.app.state.config.RELEVANCE_THRESHOLD,
hybrid_search=request.app.state.config.ENABLE_RAG_HYBRID_SEARCH,
2025-02-19 13:14:58 +08:00
full_context=request.app.state.config.RAG_FULL_CONTEXT,
2025-01-17 03:11:23 +08:00
),
)
except Exception as e:
log.exception(e)
2024-12-13 14:28:42 +08:00
log.debug(f"rag_contexts:sources: {sources}")
2025-01-17 03:11:23 +08:00
2024-12-13 14:28:42 +08:00
return body, {"sources": sources}
2024-12-17 05:27:54 +08:00
def apply_params_to_form_data(form_data, model):
params = form_data.pop("params", {})
if model.get("ollama"):
form_data["options"] = params
if "format" in params:
form_data["format"] = params["format"]
if "keep_alive" in params:
form_data["keep_alive"] = params["keep_alive"]
else:
2025-04-03 10:56:39 +08:00
if "seed" in params and params["seed"] is not None:
2024-12-17 05:27:54 +08:00
form_data["seed"] = params["seed"]
2025-04-03 10:56:39 +08:00
if "stop" in params and params["stop"] is not None:
2024-12-17 05:27:54 +08:00
form_data["stop"] = params["stop"]
2025-04-03 10:56:39 +08:00
if "temperature" in params and params["temperature"] is not None:
2024-12-17 05:27:54 +08:00
form_data["temperature"] = params["temperature"]
2025-04-03 10:56:39 +08:00
if "max_tokens" in params and params["max_tokens"] is not None:
2025-01-30 02:28:09 +08:00
form_data["max_tokens"] = params["max_tokens"]
2025-04-03 10:56:39 +08:00
if "top_p" in params and params["top_p"] is not None:
2024-12-17 05:27:54 +08:00
form_data["top_p"] = params["top_p"]
2025-04-03 10:56:39 +08:00
if "frequency_penalty" in params and params["frequency_penalty"] is not None:
2024-12-17 05:27:54 +08:00
form_data["frequency_penalty"] = params["frequency_penalty"]
2025-04-03 10:56:39 +08:00
if "reasoning_effort" in params and params["reasoning_effort"] is not None:
form_data["reasoning_effort"] = params["reasoning_effort"]
2025-04-03 10:56:39 +08:00
if "logit_bias" in params and params["logit_bias"] is not None:
2025-02-20 08:23:58 +08:00
try:
2025-03-01 22:56:24 +08:00
form_data["logit_bias"] = json.loads(
convert_logit_bias_input_to_json(params["logit_bias"])
)
except Exception as e:
print(f"Error parsing logit_bias: {e}")
2024-12-17 05:27:54 +08:00
return form_data
2025-03-01 22:56:24 +08:00
async def process_chat_payload(request, form_data, user, metadata, model):
2025-02-05 11:14:59 +08:00
2024-12-17 05:27:54 +08:00
form_data = apply_params_to_form_data(form_data, model)
log.debug(f"form_data: {form_data}")
2024-12-25 07:49:32 +08:00
event_emitter = get_event_emitter(metadata)
event_call = get_event_call(metadata)
2024-12-13 14:28:42 +08:00
extra_params = {
2024-12-25 07:49:32 +08:00
"__event_emitter__": event_emitter,
"__event_call__": event_call,
2024-12-13 14:28:42 +08:00
"__user__": {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
},
"__metadata__": metadata,
2024-12-14 14:51:43 +08:00
"__request__": request,
2025-02-08 17:07:05 +08:00
"__model__": model,
2024-12-13 14:28:42 +08:00
}
# Initialize events to store additional event to be sent to the client
# Initialize contexts and citation
2025-02-13 15:26:47 +08:00
if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
2025-02-13 14:56:33 +08:00
models = {
request.state.model["id"]: request.state.model,
}
else:
models = request.app.state.MODELS
2025-02-05 13:01:53 +08:00
task_model_id = get_task_model_id(
form_data["model"],
request.app.state.config.TASK_MODEL,
request.app.state.config.TASK_MODEL_EXTERNAL,
models,
)
2024-12-25 07:49:32 +08:00
2024-12-13 14:28:42 +08:00
events = []
sources = []
2024-12-25 07:49:32 +08:00
user_message = get_last_user_message(form_data["messages"])
model_knowledge = model.get("info", {}).get("meta", {}).get("knowledge", False)
if model_knowledge:
await event_emitter(
{
"type": "status",
"data": {
"action": "knowledge_search",
"query": user_message,
"done": False,
},
}
)
knowledge_files = []
for item in model_knowledge:
if item.get("collection_name"):
knowledge_files.append(
{
"id": item.get("collection_name"),
"name": item.get("name"),
"legacy": True,
}
)
elif item.get("collection_names"):
knowledge_files.append(
{
"name": item.get("name"),
"type": "collection",
"collection_names": item.get("collection_names"),
"legacy": True,
}
)
else:
knowledge_files.append(item)
files = form_data.get("files", [])
files.extend(knowledge_files)
form_data["files"] = files
2025-01-30 13:56:51 +08:00
variables = form_data.pop("variables", None)
2025-02-16 14:25:18 +08:00
# Process the form_data through the pipeline
try:
form_data = await process_pipeline_inlet_filter(
request, form_data, user, models
)
except Exception as e:
raise e
try:
2025-03-05 10:04:55 +08:00
filter_functions = [
Functions.get_function_by_id(filter_id)
for filter_id in get_sorted_filter_ids(model)
]
2025-02-16 14:25:18 +08:00
form_data, flags = await process_filter_functions(
request=request,
2025-03-05 10:04:55 +08:00
filter_functions=filter_functions,
2025-02-16 14:25:18 +08:00
filter_type="inlet",
form_data=form_data,
extra_params=extra_params,
)
except Exception as e:
raise Exception(f"Error: {e}")
2024-12-25 08:52:57 +08:00
features = form_data.pop("features", None)
if features:
if "web_search" in features and features["web_search"]:
form_data = await chat_web_search_handler(
request, form_data, extra_params, user
2025-01-16 15:32:13 +08:00
)
if "image_generation" in features and features["image_generation"]:
form_data = await chat_image_generation_handler(
request, form_data, extra_params, user
2024-12-25 08:52:57 +08:00
)
2025-02-03 17:14:38 +08:00
if "code_interpreter" in features and features["code_interpreter"]:
form_data["messages"] = add_or_update_user_message(
2025-02-12 13:36:16 +08:00
(
request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE
if request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE != ""
else DEFAULT_CODE_INTERPRETER_PROMPT
),
form_data["messages"],
2025-02-03 17:14:38 +08:00
)
2024-12-13 14:28:42 +08:00
tool_ids = form_data.pop("tool_ids", None)
files = form_data.pop("files", None)
2025-02-27 07:42:19 +08:00
2024-12-25 07:49:32 +08:00
# Remove files duplicates
2024-12-25 08:01:17 +08:00
if files:
files = list({json.dumps(f, sort_keys=True): f for f in files}.values())
2024-12-25 07:49:32 +08:00
2024-12-13 14:28:42 +08:00
metadata = {
**metadata,
"tool_ids": tool_ids,
"files": files,
}
form_data["metadata"] = metadata
2025-03-26 15:40:24 +08:00
# Server side tools
2025-02-05 13:01:53 +08:00
tool_ids = metadata.get("tool_ids", None)
2025-03-26 15:40:24 +08:00
# Client side tools
2025-03-27 17:27:56 +08:00
tool_servers = metadata.get("tool_servers", None)
2025-03-26 15:40:24 +08:00
2025-02-05 13:01:53 +08:00
log.debug(f"{tool_ids=}")
log.debug(f"{tool_servers=}")
2025-03-26 15:40:24 +08:00
tools_dict = {}
2025-02-05 13:01:53 +08:00
if tool_ids:
2025-03-26 15:40:24 +08:00
tools_dict = get_tools(
2025-02-05 13:01:53 +08:00
request,
tool_ids,
user,
{
**extra_params,
"__model__": models[task_model_id],
"__messages__": form_data["messages"],
"__files__": metadata.get("files", []),
},
)
2025-03-26 15:40:24 +08:00
if tool_servers:
for tool_server in tool_servers:
tool_specs = tool_server.pop("specs", [])
for tool in tool_specs:
tools_dict[tool["name"]] = {
"spec": tool,
"direct": True,
"server": tool_server,
}
2025-02-05 13:01:53 +08:00
2025-03-26 15:40:24 +08:00
if tools_dict:
2025-02-05 13:01:53 +08:00
if metadata.get("function_calling") == "native":
# If the function calling is native, then call the tools function calling handler
2025-03-26 15:40:24 +08:00
metadata["tools"] = tools_dict
2025-02-05 13:01:53 +08:00
form_data["tools"] = [
{"type": "function", "function": tool.get("spec", {})}
2025-03-26 15:40:24 +08:00
for tool in tools_dict.values()
2025-02-05 13:01:53 +08:00
]
else:
# If the function calling is not native, then call the tools function calling handler
try:
form_data, flags = await chat_completion_tools_handler(
2025-03-26 15:40:24 +08:00
request, form_data, extra_params, user, models, tools_dict
2025-02-05 13:01:53 +08:00
)
sources.extend(flags.get("sources", []))
except Exception as e:
log.exception(e)
2024-12-13 14:28:42 +08:00
try:
form_data, flags = await chat_completion_files_handler(request, form_data, user)
sources.extend(flags.get("sources", []))
except Exception as e:
log.exception(e)
# If context is not empty, insert it into the messages
if len(sources) > 0:
context_string = ""
2025-04-07 19:33:29 +08:00
citated_file_idx = {}
for _, source in enumerate(sources, 1):
2024-12-13 14:28:42 +08:00
if "document" in source:
for doc_context, doc_meta in zip(
source["document"], source["metadata"]
):
file_id = doc_meta.get("file_id")
2025-04-07 19:33:29 +08:00
if file_id not in citated_file_idx:
citated_file_idx[file_id] = len(citated_file_idx) + 1
context_string += f'<source id="{citated_file_idx[file_id]}">{doc_context}</source>\n'
2024-12-13 14:28:42 +08:00
context_string = context_string.strip()
prompt = get_last_user_message(form_data["messages"])
if prompt is None:
raise Exception("No user message found")
if (
request.app.state.config.RELEVANCE_THRESHOLD == 0
and context_string.strip() == ""
):
log.debug(
f"With a 0 relevancy threshold for RAG, the context cannot be empty"
)
# Workaround for Ollama 2.0+ system prompt issue
# TODO: replace with add_or_update_system_message
if model.get("owned_by") == "ollama":
2024-12-13 14:28:42 +08:00
form_data["messages"] = prepend_to_first_user_message_content(
rag_template(
request.app.state.config.RAG_TEMPLATE, context_string, prompt
),
form_data["messages"],
)
else:
form_data["messages"] = add_or_update_system_message(
rag_template(
request.app.state.config.RAG_TEMPLATE, context_string, prompt
),
form_data["messages"],
)
# If there are citations, add them to the data_items
sources = [source for source in sources if source.get("source", {}).get("name", "")]
if len(sources) > 0:
events.append({"sources": sources})
2024-12-25 07:49:32 +08:00
if model_knowledge:
await event_emitter(
{
"type": "status",
"data": {
"action": "knowledge_search",
"query": user_message,
"done": True,
"hidden": True,
},
}
)
2025-02-05 13:01:53 +08:00
return form_data, metadata, events
2024-12-13 14:28:42 +08:00
2024-12-25 14:45:21 +08:00
async def process_chat_response(
2025-03-01 22:56:24 +08:00
request, response, form_data, user, metadata, model, events, tasks
2024-12-25 14:45:21 +08:00
):
2024-12-26 14:21:44 +08:00
async def background_tasks_handler():
message_map = Chats.get_messages_by_chat_id(metadata["chat_id"])
2024-12-31 03:29:18 +08:00
message = message_map.get(metadata["message_id"]) if message_map else None
2024-12-26 14:21:44 +08:00
if message:
messages = get_message_list(message_map, message.get("id"))
2025-02-04 10:36:49 +08:00
if tasks and messages:
2024-12-26 14:21:44 +08:00
if TASKS.TITLE_GENERATION in tasks:
if tasks[TASKS.TITLE_GENERATION]:
res = await generate_title(
request,
{
"model": message["model"],
"messages": messages,
"chat_id": metadata["chat_id"],
},
user,
)
2024-12-13 14:28:42 +08:00
2024-12-26 14:21:44 +08:00
if res and isinstance(res, dict):
2025-01-11 08:27:37 +08:00
if len(res.get("choices", [])) == 1:
2025-01-30 06:40:36 +08:00
title_string = (
2025-01-11 08:27:37 +08:00
res.get("choices", [])[0]
.get("message", {})
2025-01-30 06:40:36 +08:00
.get("content", message.get("content", "New Chat"))
)
2025-01-11 08:27:37 +08:00
else:
2025-01-30 06:40:36 +08:00
title_string = ""
title_string = title_string[
title_string.find("{") : title_string.rfind("}") + 1
]
try:
title = json.loads(title_string).get(
"title", "New Chat"
)
except Exception as e:
2025-01-30 12:03:46 +08:00
title = ""
2024-12-27 15:51:30 +08:00
if not title:
title = messages[0].get("content", "New Chat")
2024-12-26 14:21:44 +08:00
Chats.update_chat_title_by_id(metadata["chat_id"], title)
await event_emitter(
{
"type": "chat:title",
"data": title,
}
)
elif len(messages) == 2:
title = messages[0].get("content", "New Chat")
Chats.update_chat_title_by_id(metadata["chat_id"], title)
await event_emitter(
{
"type": "chat:title",
"data": message.get("content", "New Chat"),
}
)
if TASKS.TAGS_GENERATION in tasks and tasks[TASKS.TAGS_GENERATION]:
res = await generate_chat_tags(
request,
{
"model": message["model"],
"messages": messages,
"chat_id": metadata["chat_id"],
},
user,
)
if res and isinstance(res, dict):
2025-01-11 08:27:37 +08:00
if len(res.get("choices", [])) == 1:
tags_string = (
res.get("choices", [])[0]
.get("message", {})
.get("content", "")
)
else:
tags_string = ""
2024-12-26 14:21:44 +08:00
tags_string = tags_string[
tags_string.find("{") : tags_string.rfind("}") + 1
]
try:
tags = json.loads(tags_string).get("tags", [])
Chats.update_chat_tags_by_id(
metadata["chat_id"], tags, user
)
await event_emitter(
{
"type": "chat:tags",
"data": tags,
}
)
except Exception as e:
2025-01-11 08:27:37 +08:00
pass
2024-12-13 14:28:42 +08:00
2024-12-19 17:00:32 +08:00
event_emitter = None
2025-02-03 12:50:54 +08:00
event_caller = None
2024-12-21 10:37:25 +08:00
if (
"session_id" in metadata
and metadata["session_id"]
and "chat_id" in metadata
and metadata["chat_id"]
and "message_id" in metadata
and metadata["message_id"]
):
2024-12-19 17:00:32 +08:00
event_emitter = get_event_emitter(metadata)
2025-02-03 12:50:54 +08:00
event_caller = get_event_call(metadata)
2024-12-13 14:28:42 +08:00
2025-02-03 12:50:54 +08:00
# Non-streaming response
2024-12-26 14:21:44 +08:00
if not isinstance(response, StreamingResponse):
if event_emitter:
2025-03-28 15:52:13 +08:00
if "error" in response:
error = response["error"].get("detail", response["error"])
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"error": {"content": error},
},
)
2024-12-26 14:21:44 +08:00
if "selected_model_id" in response:
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": response["selected_model_id"],
},
)
2025-03-28 15:52:13 +08:00
choices = response.get("choices", [])
if choices and choices[0].get("message", {}).get("content"):
2024-12-26 14:21:44 +08:00
content = response["choices"][0]["message"]["content"]
if content:
await event_emitter(
{
"type": "chat:completion",
"data": response,
}
)
title = Chats.get_chat_title_by_id(metadata["chat_id"])
await event_emitter(
{
"type": "chat:completion",
"data": {
"done": True,
"content": content,
"title": title,
},
}
)
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"content": content,
},
)
# Send a webhook notification if the user is not active
2024-12-31 07:39:35 +08:00
if get_active_status_by_user_id(user.id) is None:
2024-12-26 14:21:44 +08:00
webhook_url = Users.get_user_webhook_url_by_id(user.id)
if webhook_url:
post_webhook(
2025-02-16 16:11:18 +08:00
request.app.state.WEBUI_NAME,
2024-12-26 14:21:44 +08:00
webhook_url,
f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",
{
"action": "chat",
"message": content,
"title": title,
"url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",
},
)
await background_tasks_handler()
return response
else:
return response
2025-02-03 12:50:54 +08:00
# Non standard response
2024-12-26 14:21:44 +08:00
if not any(
content_type in response.headers["Content-Type"]
for content_type in ["text/event-stream", "application/x-ndjson"]
):
return response
2025-02-25 17:00:29 +08:00
extra_params = {
"__event_emitter__": event_emitter,
"__event_call__": event_caller,
"__user__": {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
},
"__metadata__": metadata,
"__request__": request,
2025-03-01 22:56:24 +08:00
"__model__": model,
2025-02-25 17:00:29 +08:00
}
2025-03-05 10:04:55 +08:00
filter_functions = [
Functions.get_function_by_id(filter_id)
for filter_id in get_sorted_filter_ids(model)
]
2025-03-01 22:56:24 +08:00
2025-02-03 12:50:54 +08:00
# Streaming response
if event_emitter and event_caller:
2024-12-19 17:00:32 +08:00
task_id = str(uuid4()) # Create a unique task ID.
2025-02-03 14:38:19 +08:00
model_id = form_data.get("model", "")
2024-12-13 14:28:42 +08:00
2025-02-04 10:17:38 +08:00
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"model": model_id,
},
)
def split_content_and_whitespace(content):
content_stripped = content.rstrip()
2025-02-08 14:57:39 +08:00
original_whitespace = (
content[len(content_stripped) :]
if len(content) > len(content_stripped)
else ""
)
return content_stripped, original_whitespace
def is_opening_code_block(content):
2025-02-08 14:57:39 +08:00
backtick_segments = content.split("```")
# Even number of segments means the last backticks are opening a new block
return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0
2024-12-19 17:00:32 +08:00
# Handle as a background task
async def post_response_handler(response, events):
2025-02-03 16:03:41 +08:00
def serialize_content_blocks(content_blocks, raw=False):
2025-02-03 12:50:54 +08:00
content = ""
for block in content_blocks:
if block["type"] == "text":
content = f"{content}{block['content'].strip()}\n"
2025-02-05 15:05:14 +08:00
elif block["type"] == "tool_calls":
attributes = block.get("attributes", {})
2025-03-28 17:27:40 +08:00
tool_calls = block.get("content", [])
2025-02-05 15:05:14 +08:00
results = block.get("results", [])
if results:
2025-02-05 15:20:09 +08:00
2025-03-28 17:27:40 +08:00
tool_calls_display_content = ""
for tool_call in tool_calls:
2025-02-05 15:20:09 +08:00
2025-03-29 03:18:27 +08:00
tool_call_id = tool_call.get("id", "")
tool_name = tool_call.get("function", {}).get(
"name", ""
)
tool_arguments = tool_call.get("function", {}).get(
"arguments", ""
)
2025-03-28 17:27:40 +08:00
tool_result = None
2025-04-03 14:46:39 +08:00
tool_result_files = None
2025-03-28 17:27:40 +08:00
for result in results:
2025-03-29 03:18:27 +08:00
if tool_call_id == result.get("tool_call_id", ""):
tool_result = result.get("content", None)
2025-04-03 14:46:39 +08:00
tool_result_files = result.get("files", None)
2025-02-05 15:20:09 +08:00
break
2025-03-28 17:27:40 +08:00
if tool_result:
2025-04-03 14:46:39 +08:00
tool_calls_display_content = f'{tool_calls_display_content}\n<details type="tool_calls" done="true" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}" result="{html.escape(json.dumps(tool_result))}" files="{html.escape(json.dumps(tool_result_files)) if tool_result_files else ""}">\n<summary>Tool Executed</summary>\n</details>\n'
2025-03-28 17:27:40 +08:00
else:
2025-03-29 03:23:25 +08:00
tool_calls_display_content = f'{tool_calls_display_content}\n<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>'
2025-02-05 15:20:09 +08:00
2025-02-05 15:05:14 +08:00
if not raw:
2025-03-29 03:23:25 +08:00
content = f"{content}\n{tool_calls_display_content}\n\n"
2025-02-05 15:05:14 +08:00
else:
2025-02-05 17:03:16 +08:00
tool_calls_display_content = ""
2025-03-28 17:27:40 +08:00
for tool_call in tool_calls:
2025-03-29 03:23:25 +08:00
tool_call_id = tool_call.get("id", "")
2025-03-29 03:18:27 +08:00
tool_name = tool_call.get("function", {}).get(
"name", ""
)
tool_arguments = tool_call.get("function", {}).get(
"arguments", ""
)
2025-03-29 03:23:25 +08:00
tool_calls_display_content = f'{tool_calls_display_content}\n<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>'
2025-02-05 17:03:16 +08:00
2025-02-05 15:05:14 +08:00
if not raw:
2025-03-29 03:23:25 +08:00
content = f"{content}\n{tool_calls_display_content}\n\n"
2025-02-05 13:20:03 +08:00
2025-02-03 12:50:54 +08:00
elif block["type"] == "reasoning":
reasoning_display_content = "\n".join(
(f"> {line}" if not line.startswith(">") else line)
for line in block["content"].splitlines()
)
reasoning_duration = block.get("duration", None)
2025-02-06 06:26:09 +08:00
if reasoning_duration is not None:
2025-02-04 08:18:07 +08:00
if raw:
content = f'{content}\n<{block["start_tag"]}>{block["content"]}<{block["end_tag"]}>\n'
2025-02-04 08:18:07 +08:00
else:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'
2025-02-03 12:50:54 +08:00
else:
2025-02-04 08:18:07 +08:00
if raw:
content = f'{content}\n<{block["start_tag"]}>{block["content"]}<{block["end_tag"]}>\n'
2025-02-04 08:18:07 +08:00
else:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'
2025-02-03 12:50:54 +08:00
2025-02-03 14:38:19 +08:00
elif block["type"] == "code_interpreter":
attributes = block.get("attributes", {})
2025-02-03 16:03:41 +08:00
output = block.get("output", None)
2025-02-03 14:38:19 +08:00
lang = attributes.get("lang", "")
2025-02-08 14:57:39 +08:00
content_stripped, original_whitespace = (
split_content_and_whitespace(content)
)
if is_opening_code_block(content_stripped):
# Remove trailing backticks that would open a new block
2025-02-08 14:57:39 +08:00
content = (
content_stripped.rstrip("`").rstrip()
+ original_whitespace
)
else:
# Keep content as is - either closing backticks or no backticks
content = content_stripped + original_whitespace
2025-02-03 15:35:58 +08:00
if output:
2025-02-03 16:03:41 +08:00
output = html.escape(json.dumps(output))
if raw:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n```output\n{output}\n```\n'
2025-02-03 16:03:41 +08:00
else:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<details type="code_interpreter" done="true" output="{output}">\n<summary>Analyzed</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
2025-02-03 15:35:58 +08:00
else:
2025-02-04 08:18:07 +08:00
if raw:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n'
2025-02-04 08:18:07 +08:00
else:
2025-02-04 14:05:52 +08:00
content = f'{content}\n<details type="code_interpreter" done="false">\n<summary>Analyzing...</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
2025-02-03 15:35:58 +08:00
2025-02-03 12:50:54 +08:00
else:
2025-02-03 14:38:19 +08:00
block_content = str(block["content"]).strip()
content = f"{content}{block['type']}: {block_content}\n"
2025-02-03 12:50:54 +08:00
2025-02-06 06:20:51 +08:00
return content.strip()
2025-02-03 12:50:54 +08:00
2025-02-14 06:50:46 +08:00
def convert_content_blocks_to_messages(content_blocks):
messages = []
temp_blocks = []
for idx, block in enumerate(content_blocks):
if block["type"] == "tool_calls":
messages.append(
{
"role": "assistant",
"content": serialize_content_blocks(temp_blocks),
"tool_calls": block.get("content"),
}
)
results = block.get("results", [])
for result in results:
messages.append(
{
"role": "tool",
"tool_call_id": result["tool_call_id"],
"content": result["content"],
}
)
temp_blocks = []
else:
temp_blocks.append(block)
if temp_blocks:
2025-02-14 07:17:41 +08:00
content = serialize_content_blocks(temp_blocks)
if content:
messages.append(
{
"role": "assistant",
"content": content,
}
)
2025-02-14 06:50:46 +08:00
return messages
2025-02-03 12:50:54 +08:00
def tag_content_handler(content_type, tags, content, content_blocks):
2025-02-03 16:24:09 +08:00
end_flag = False
2025-02-03 12:50:54 +08:00
def extract_attributes(tag_content):
"""Extract attributes from a tag if they exist."""
attributes = {}
2025-02-06 06:10:53 +08:00
if not tag_content: # Ensure tag_content is not None
return attributes
2025-02-03 12:50:54 +08:00
# Match attributes in the format: key="value" (ignores single quotes for simplicity)
matches = re.findall(r'(\w+)\s*=\s*"([^"]+)"', tag_content)
for key, value in matches:
attributes[key] = value
return attributes
if content_blocks[-1]["type"] == "text":
for start_tag, end_tag in tags:
2025-02-03 12:50:54 +08:00
# Match start tag e.g., <tag> or <tag attr="value">
start_tag_pattern = rf"<{re.escape(start_tag)}(\s.*?)?>"
2025-02-03 12:50:54 +08:00
match = re.search(start_tag_pattern, content)
if match:
2025-02-06 06:10:53 +08:00
attr_content = (
match.group(1) if match.group(1) else ""
) # Ensure it's not None
attributes = extract_attributes(
attr_content
) # Extract attributes safely
2025-02-06 06:20:51 +08:00
# Capture everything before and after the matched tag
before_tag = content[
: match.start()
] # Content before opening tag
after_tag = content[
match.end() :
] # Content after opening tag
# Remove the start tag and after from the currently handling text block
2025-02-03 12:50:54 +08:00
content_blocks[-1]["content"] = content_blocks[-1][
"content"
].replace(match.group(0) + after_tag, "")
2025-02-05 18:33:40 +08:00
2025-02-06 06:20:51 +08:00
if before_tag:
content_blocks[-1]["content"] = before_tag
2025-02-03 12:50:54 +08:00
if not content_blocks[-1]["content"]:
content_blocks.pop()
2025-02-05 18:10:28 +08:00
2025-02-03 12:50:54 +08:00
# Append the new block
content_blocks.append(
{
"type": content_type,
"start_tag": start_tag,
"end_tag": end_tag,
2025-02-03 12:50:54 +08:00
"attributes": attributes,
"content": "",
"started_at": time.time(),
}
)
2025-02-06 06:20:51 +08:00
if after_tag:
content_blocks[-1]["content"] = after_tag
2025-02-03 12:50:54 +08:00
break
elif content_blocks[-1]["type"] == content_type:
start_tag = content_blocks[-1]["start_tag"]
end_tag = content_blocks[-1]["end_tag"]
2025-02-03 12:50:54 +08:00
# Match end tag e.g., </tag>
end_tag_pattern = rf"<{re.escape(end_tag)}>"
2025-02-05 18:33:40 +08:00
2025-02-06 06:10:53 +08:00
# Check if the content has the end tag
2025-02-03 12:50:54 +08:00
if re.search(end_tag_pattern, content):
2025-02-05 18:38:05 +08:00
end_flag = True
2025-02-03 12:50:54 +08:00
block_content = content_blocks[-1]["content"]
# Strip start and end tags from the content
start_tag_pattern = rf"<{re.escape(start_tag)}(.*?)>"
2025-02-03 12:50:54 +08:00
block_content = re.sub(
start_tag_pattern, "", block_content
).strip()
2025-02-05 18:33:40 +08:00
end_tag_regex = re.compile(end_tag_pattern, re.DOTALL)
split_content = end_tag_regex.split(block_content, maxsplit=1)
# Content inside the tag
block_content = (
split_content[0].strip() if split_content else ""
)
# Leftover content (everything after `</tag>`)
leftover_content = (
split_content[1].strip() if len(split_content) > 1 else ""
)
2025-02-03 12:50:54 +08:00
if block_content:
content_blocks[-1]["content"] = block_content
content_blocks[-1]["ended_at"] = time.time()
content_blocks[-1]["duration"] = int(
content_blocks[-1]["ended_at"]
- content_blocks[-1]["started_at"]
)
2025-02-06 06:20:51 +08:00
2025-02-03 12:50:54 +08:00
# Reset the content_blocks by appending a new text block
2025-02-06 06:20:51 +08:00
if content_type != "code_interpreter":
if leftover_content:
content_blocks.append(
{
"type": "text",
"content": leftover_content,
}
)
2025-02-06 06:26:09 +08:00
else:
content_blocks.append(
{
"type": "text",
"content": "",
}
)
2025-02-03 12:50:54 +08:00
else:
# Remove the block if content is empty
content_blocks.pop()
2025-02-05 18:33:40 +08:00
if leftover_content:
content_blocks.append(
{
"type": "text",
"content": leftover_content,
}
)
2025-02-06 06:26:09 +08:00
else:
content_blocks.append(
{
"type": "text",
"content": "",
}
)
2025-02-05 18:33:40 +08:00
# Clean processed content
content = re.sub(
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
2025-02-05 18:33:40 +08:00
"",
content,
flags=re.DOTALL,
)
2025-02-03 16:24:09 +08:00
return content, content_blocks, end_flag
2025-02-03 12:50:54 +08:00
2024-12-29 11:31:03 +08:00
message = Chats.get_message_by_id_and_message_id(
metadata["chat_id"], metadata["message_id"]
)
2025-02-03 12:50:54 +08:00
2025-02-05 15:05:14 +08:00
tool_calls = []
2025-02-18 10:40:40 +08:00
2025-02-19 01:57:12 +08:00
last_assistant_message = None
try:
if form_data["messages"][-1]["role"] == "assistant":
last_assistant_message = get_last_assistant_message(
form_data["messages"]
)
except Exception as e:
pass
2025-02-18 10:40:40 +08:00
content = (
message.get("content", "")
if message
else last_assistant_message if last_assistant_message else ""
)
2025-02-03 12:50:54 +08:00
content_blocks = [
{
"type": "text",
"content": content,
}
]
# We might want to disable this by default
DETECT_REASONING = True
DETECT_SOLUTION = True
2025-02-05 10:33:22 +08:00
DETECT_CODE_INTERPRETER = metadata.get("features", {}).get(
"code_interpreter", False
)
2025-02-03 12:50:54 +08:00
2025-02-06 06:10:53 +08:00
reasoning_tags = [
("think", "/think"),
("thinking", "/thinking"),
("reason", "/reason"),
("reasoning", "/reasoning"),
("thought", "/thought"),
("Thought", "/Thought"),
2025-02-25 17:00:29 +08:00
("|begin_of_thought|", "|end_of_thought|"),
2025-02-06 06:10:53 +08:00
]
2025-02-25 17:00:29 +08:00
code_interpreter_tags = [("code_interpreter", "/code_interpreter")]
2025-02-25 17:00:29 +08:00
solution_tags = [("|begin_of_solution|", "|end_of_solution|")]
2024-12-28 14:46:50 +08:00
2024-12-19 17:00:32 +08:00
try:
for event in events:
await event_emitter(
{
2024-12-19 17:05:47 +08:00
"type": "chat:completion",
2024-12-19 17:00:32 +08:00
"data": event,
}
)
2024-12-22 00:45:52 +08:00
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
**event,
},
)
2025-02-03 14:38:19 +08:00
async def stream_body_handler(response):
nonlocal content
nonlocal content_blocks
2024-12-19 17:00:32 +08:00
2025-02-05 15:05:14 +08:00
response_tool_calls = []
2025-02-03 14:38:19 +08:00
async for line in response.body_iterator:
line = line.decode("utf-8") if isinstance(line, bytes) else line
data = line
2024-12-19 17:00:32 +08:00
2025-02-03 14:38:19 +08:00
# Skip empty lines
if not data.strip():
continue
2024-12-19 17:00:32 +08:00
2025-02-03 14:38:19 +08:00
# "data:" is the prefix for each event
if not data.startswith("data:"):
continue
2024-12-19 17:00:32 +08:00
2025-02-03 14:38:19 +08:00
# Remove the prefix
data = data[len("data:") :].strip()
2024-12-19 17:00:32 +08:00
2025-02-03 14:38:19 +08:00
try:
data = json.loads(data)
2024-12-25 10:34:56 +08:00
2025-02-25 17:00:29 +08:00
data, _ = await process_filter_functions(
request=request,
2025-03-05 10:04:55 +08:00
filter_functions=filter_functions,
2025-02-25 17:00:29 +08:00
filter_type="stream",
form_data=data,
extra_params=extra_params,
)
2025-02-05 18:33:40 +08:00
2025-02-25 17:03:15 +08:00
if data:
if "event" in data:
await event_emitter(data.get("event", {}))
2025-02-25 17:03:15 +08:00
if "selected_model_id" in data:
model_id = data["selected_model_id"]
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": model_id,
},
2025-02-03 12:50:54 +08:00
)
2025-02-25 17:03:15 +08:00
else:
choices = data.get("choices", [])
if not choices:
2025-03-28 15:25:00 +08:00
error = data.get("error", {})
if error:
await event_emitter(
{
"type": "chat:completion",
"data": {
"error": error,
},
}
)
2025-02-25 17:35:59 +08:00
usage = data.get("usage", {})
if usage:
await event_emitter(
{
"type": "chat:completion",
"data": {
"usage": usage,
},
}
)
2025-02-25 17:03:15 +08:00
continue
2025-01-22 16:13:24 +08:00
2025-02-25 17:03:15 +08:00
delta = choices[0].get("delta", {})
delta_tool_calls = delta.get("tool_calls", None)
if delta_tool_calls:
for delta_tool_call in delta_tool_calls:
tool_call_index = delta_tool_call.get(
"index"
2025-02-03 16:24:09 +08:00
)
2025-02-03 14:38:19 +08:00
2025-02-25 17:03:15 +08:00
if tool_call_index is not None:
# Check if the tool call already exists
current_response_tool_call = None
for (
response_tool_call
) in response_tool_calls:
if (
response_tool_call.get("index")
== tool_call_index
):
current_response_tool_call = (
response_tool_call
)
break
if current_response_tool_call is None:
# Add the new tool call
2025-02-25 17:03:15 +08:00
response_tool_calls.append(
delta_tool_call
)
else:
# Update the existing tool call
2025-02-25 17:03:15 +08:00
delta_name = delta_tool_call.get(
"function", {}
).get("name")
delta_arguments = (
delta_tool_call.get(
"function", {}
).get("arguments")
)
if delta_name:
current_response_tool_call[
"function"
]["name"] += delta_name
2025-02-25 17:03:15 +08:00
if delta_arguments:
current_response_tool_call[
"function"
][
2025-02-25 17:03:15 +08:00
"arguments"
] += delta_arguments
value = delta.get("content")
2025-03-04 12:34:17 +08:00
2025-03-25 02:35:32 +08:00
reasoning_content = delta.get(
"reasoning_content"
) or delta.get("reasoning")
if reasoning_content:
2025-03-04 12:34:17 +08:00
if (
not content_blocks
or content_blocks[-1]["type"] != "reasoning"
):
reasoning_block = {
"type": "reasoning",
"start_tag": "think",
"end_tag": "/think",
2025-03-04 12:34:17 +08:00
"attributes": {
"type": "reasoning_content"
},
"content": "",
2025-03-04 12:34:17 +08:00
"started_at": time.time(),
}
content_blocks.append(reasoning_block)
else:
reasoning_block = content_blocks[-1]
2025-03-04 12:34:17 +08:00
reasoning_block["content"] += reasoning_content
2025-03-04 12:34:17 +08:00
data = {
2025-03-04 12:34:17 +08:00
"content": serialize_content_blocks(
content_blocks
)
}
2025-03-04 12:34:17 +08:00
2025-02-25 17:03:15 +08:00
if value:
2025-03-04 12:34:17 +08:00
if (
content_blocks
and content_blocks[-1]["type"]
== "reasoning"
and content_blocks[-1]
.get("attributes", {})
.get("type")
== "reasoning_content"
):
reasoning_block = content_blocks[-1]
reasoning_block["ended_at"] = time.time()
2025-03-04 12:34:17 +08:00
reasoning_block["duration"] = int(
reasoning_block["ended_at"]
- reasoning_block["started_at"]
)
2025-02-25 17:03:15 +08:00
content_blocks.append(
{
"type": "text",
"content": "",
}
)
2025-03-04 12:34:17 +08:00
content = f"{content}{value}"
2025-02-25 17:03:15 +08:00
if not content_blocks:
content_blocks.append(
{
"type": "text",
"content": "",
}
2025-02-03 16:24:09 +08:00
)
2025-02-25 17:03:15 +08:00
content_blocks[-1]["content"] = (
content_blocks[-1]["content"] + value
2025-02-03 14:38:19 +08:00
)
2025-02-25 17:03:15 +08:00
if DETECT_REASONING:
content, content_blocks, _ = (
tag_content_handler(
"reasoning",
reasoning_tags,
content,
content_blocks,
)
)
2025-02-03 16:24:09 +08:00
2025-02-25 17:03:15 +08:00
if DETECT_CODE_INTERPRETER:
content, content_blocks, end = (
tag_content_handler(
"code_interpreter",
code_interpreter_tags,
content,
content_blocks,
)
)
2025-02-03 16:24:09 +08:00
2025-02-25 17:03:15 +08:00
if end:
break
if DETECT_SOLUTION:
content, content_blocks, _ = (
tag_content_handler(
"solution",
solution_tags,
content,
content_blocks,
)
)
if ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"content": serialize_content_blocks(
content_blocks
),
},
)
else:
data = {
2025-02-03 14:38:19 +08:00
"content": serialize_content_blocks(
content_blocks
),
2025-02-25 17:03:15 +08:00
}
2025-02-03 14:38:19 +08:00
2025-02-25 17:03:15 +08:00
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
2025-02-03 14:38:19 +08:00
except Exception as e:
done = "data: [DONE]" in line
if done:
pass
else:
log.debug("Error: ", e)
continue
2025-02-05 18:10:28 +08:00
if content_blocks:
# Clean up the last text block
if content_blocks[-1]["type"] == "text":
content_blocks[-1]["content"] = content_blocks[-1][
"content"
].strip()
if not content_blocks[-1]["content"]:
content_blocks.pop()
2025-02-03 16:24:09 +08:00
2025-02-05 18:10:28 +08:00
if not content_blocks:
content_blocks.append(
{
"type": "text",
"content": "",
}
)
2025-02-03 16:24:09 +08:00
2025-02-05 15:05:14 +08:00
if response_tool_calls:
tool_calls.append(response_tool_calls)
if response.background:
await response.background()
await stream_body_handler(response)
2025-04-03 14:49:42 +08:00
MAX_TOOL_CALL_RETRIES = 10
2025-02-05 15:05:14 +08:00
tool_call_retries = 0
while len(tool_calls) > 0 and tool_call_retries < MAX_TOOL_CALL_RETRIES:
tool_call_retries += 1
response_tool_calls = tool_calls.pop(0)
content_blocks.append(
{
"type": "tool_calls",
"content": response_tool_calls,
}
)
2025-02-04 08:21:44 +08:00
await event_emitter(
{
"type": "chat:completion",
"data": {
"content": serialize_content_blocks(content_blocks),
},
}
)
2025-02-05 15:05:14 +08:00
tools = metadata.get("tools", {})
2025-02-03 14:38:19 +08:00
2025-02-05 15:05:14 +08:00
results = []
for tool_call in response_tool_calls:
tool_call_id = tool_call.get("id", "")
tool_name = tool_call.get("function", {}).get("name", "")
tool_function_params = {}
try:
2025-02-05 16:01:24 +08:00
# json.loads cannot be used because some models do not produce valid JSON
tool_function_params = ast.literal_eval(
2025-02-05 15:05:14 +08:00
tool_call.get("function", {}).get("arguments", "{}")
)
except Exception as e:
log.debug(e)
2025-03-30 17:39:23 +08:00
# Fallback to JSON parsing
try:
tool_function_params = json.loads(
tool_call.get("function", {}).get("arguments", "{}")
)
except Exception as e:
log.debug(
f"Error parsing tool call arguments: {tool_call.get('function', {}).get('arguments', '{}')}"
)
2025-02-05 15:05:14 +08:00
tool_result = None
if tool_name in tools:
tool = tools[tool_name]
spec = tool.get("spec", {})
try:
2025-03-08 04:37:22 +08:00
allowed_params = (
spec.get("parameters", {})
.get("properties", {})
.keys()
2025-02-05 15:05:14 +08:00
)
2025-03-27 17:50:53 +08:00
2025-02-05 15:05:14 +08:00
tool_function_params = {
k: v
for k, v in tool_function_params.items()
if k in allowed_params
2025-02-05 15:05:14 +08:00
}
2025-03-26 15:40:24 +08:00
if tool.get("direct", False):
tool_result = await event_caller(
{
"type": "execute:tool",
"data": {
"id": str(uuid4()),
2025-03-27 17:27:56 +08:00
"name": tool_name,
2025-03-26 15:40:24 +08:00
"params": tool_function_params,
"server": tool.get("server", {}),
2025-03-26 15:40:24 +08:00
"session_id": metadata.get(
"session_id", None
),
},
}
)
2025-03-27 17:50:53 +08:00
else:
tool_function = tool["callable"]
tool_result = await tool_function(
**tool_function_params
)
2025-02-05 15:05:14 +08:00
except Exception as e:
tool_result = str(e)
2025-04-03 14:46:39 +08:00
tool_result_files = []
if isinstance(tool_result, list):
for item in tool_result:
2025-04-05 19:38:46 +08:00
# check if string
if isinstance(item, str) and item.startswith("data:"):
2025-04-03 14:46:39 +08:00
tool_result_files.append(item)
tool_result.remove(item)
2025-03-30 15:44:09 +08:00
if isinstance(tool_result, dict) or isinstance(
tool_result, list
):
tool_result = json.dumps(tool_result, indent=2)
2025-03-28 15:07:00 +08:00
2025-02-05 15:05:14 +08:00
results.append(
{
"tool_call_id": tool_call_id,
"content": tool_result,
2025-04-03 14:46:39 +08:00
**(
{"files": tool_result_files}
if tool_result_files
else {}
),
2025-02-05 15:05:14 +08:00
}
)
content_blocks[-1]["results"] = results
content_blocks.append(
{
"type": "text",
"content": "",
}
)
await event_emitter(
{
"type": "chat:completion",
"data": {
"content": serialize_content_blocks(content_blocks),
},
}
)
try:
res = await generate_chat_completion(
request,
{
"model": model_id,
"stream": True,
2025-02-13 20:19:24 +08:00
"tools": form_data["tools"],
2025-02-05 15:05:14 +08:00
"messages": [
*form_data["messages"],
2025-02-14 06:50:46 +08:00
*convert_content_blocks_to_messages(content_blocks),
2025-02-05 15:05:14 +08:00
],
},
user,
)
if isinstance(res, StreamingResponse):
await stream_body_handler(res)
else:
break
except Exception as e:
log.debug(e)
break
2025-02-03 14:38:19 +08:00
2025-02-05 10:33:22 +08:00
if DETECT_CODE_INTERPRETER:
MAX_RETRIES = 5
retries = 0
2025-02-03 14:38:19 +08:00
2025-02-05 10:33:22 +08:00
while (
content_blocks[-1]["type"] == "code_interpreter"
and retries < MAX_RETRIES
):
2025-02-11 05:53:16 +08:00
await event_emitter(
{
"type": "chat:completion",
"data": {
"content": serialize_content_blocks(content_blocks),
},
}
)
2025-02-05 10:33:22 +08:00
retries += 1
log.debug(f"Attempt count: {retries}")
2025-02-03 14:38:19 +08:00
2025-02-05 10:33:22 +08:00
output = ""
try:
if content_blocks[-1]["attributes"].get("type") == "code":
2025-02-10 18:25:02 +08:00
code = content_blocks[-1]["content"]
if (
request.app.state.config.CODE_INTERPRETER_ENGINE
== "pyodide"
):
output = await event_caller(
{
"type": "execute:python",
"data": {
"id": str(uuid4()),
"code": code,
2025-02-13 14:56:33 +08:00
"session_id": metadata.get(
"session_id", None
),
2025-02-10 18:25:02 +08:00
},
}
)
elif (
request.app.state.config.CODE_INTERPRETER_ENGINE
== "jupyter"
):
output = await execute_code_jupyter(
request.app.state.config.CODE_INTERPRETER_JUPYTER_URL,
code,
(
request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_TOKEN
if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
== "token"
else None
),
(
request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD
if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
== "password"
else None
),
request.app.state.config.CODE_INTERPRETER_JUPYTER_TIMEOUT,
2025-02-10 18:25:02 +08:00
)
else:
output = {
"stdout": "Code interpreter engine not configured."
2025-02-05 10:33:22 +08:00
}
2025-02-03 14:38:19 +08:00
2025-02-13 16:40:04 +08:00
log.debug(f"Code interpreter output: {output}")
2025-02-05 10:33:22 +08:00
if isinstance(output, dict):
stdout = output.get("stdout", "")
2025-02-13 16:40:04 +08:00
if isinstance(stdout, str):
2025-02-05 10:33:22 +08:00
stdoutLines = stdout.split("\n")
for idx, line in enumerate(stdoutLines):
if "data:image/png;base64" in line:
id = str(uuid4())
# ensure the path exists
os.makedirs(
os.path.join(CACHE_DIR, "images"),
exist_ok=True,
)
image_path = os.path.join(
CACHE_DIR,
f"images/{id}.png",
)
with open(image_path, "wb") as f:
f.write(
base64.b64decode(
line.split(",")[1]
)
)
stdoutLines[idx] = (
f"![Output Image {idx}](/cache/images/{id}.png)"
)
output["stdout"] = "\n".join(stdoutLines)
2025-02-11 05:12:05 +08:00
result = output.get("result", "")
2025-02-13 16:40:04 +08:00
if isinstance(result, str):
2025-02-11 05:12:05 +08:00
resultLines = result.split("\n")
for idx, line in enumerate(resultLines):
if "data:image/png;base64" in line:
id = str(uuid4())
# ensure the path exists
os.makedirs(
os.path.join(CACHE_DIR, "images"),
exist_ok=True,
)
image_path = os.path.join(
CACHE_DIR,
f"images/{id}.png",
)
with open(image_path, "wb") as f:
f.write(
base64.b64decode(
line.split(",")[1]
)
)
resultLines[idx] = (
f"![Output Image {idx}](/cache/images/{id}.png)"
)
output["result"] = "\n".join(resultLines)
2025-02-05 10:33:22 +08:00
except Exception as e:
output = str(e)
2025-02-03 15:35:58 +08:00
2025-02-05 10:33:22 +08:00
content_blocks[-1]["output"] = output
2025-02-05 15:05:14 +08:00
2025-02-05 10:33:22 +08:00
content_blocks.append(
{
"type": "text",
"content": "",
}
)
2025-02-03 14:38:19 +08:00
2025-02-05 10:33:22 +08:00
await event_emitter(
2025-02-03 14:38:19 +08:00
{
2025-02-05 10:33:22 +08:00
"type": "chat:completion",
"data": {
"content": serialize_content_blocks(content_blocks),
},
}
2024-12-31 07:39:35 +08:00
)
2025-02-03 14:38:19 +08:00
2025-02-05 10:33:22 +08:00
try:
res = await generate_chat_completion(
request,
{
"model": model_id,
"stream": True,
"messages": [
*form_data["messages"],
{
"role": "assistant",
"content": serialize_content_blocks(
content_blocks, raw=True
),
},
],
},
user,
)
if isinstance(res, StreamingResponse):
await stream_body_handler(res)
else:
break
except Exception as e:
log.debug(e)
2025-02-03 14:38:19 +08:00
break
2024-12-31 07:39:35 +08:00
title = Chats.get_chat_title_by_id(metadata["chat_id"])
2025-02-03 12:50:54 +08:00
data = {
"done": True,
"content": serialize_content_blocks(content_blocks),
"title": title,
}
2024-12-31 07:39:35 +08:00
if not ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
2024-12-19 17:00:32 +08:00
{
2025-02-03 12:50:54 +08:00
"content": serialize_content_blocks(content_blocks),
2024-12-31 07:39:35 +08:00
},
2024-12-19 17:00:32 +08:00
)
2024-12-31 07:39:35 +08:00
# Send a webhook notification if the user is not active
if get_active_status_by_user_id(user.id) is None:
webhook_url = Users.get_user_webhook_url_by_id(user.id)
if webhook_url:
post_webhook(
2025-02-16 16:11:18 +08:00
request.app.state.WEBUI_NAME,
2024-12-31 07:39:35 +08:00
webhook_url,
f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",
{
"action": "chat",
"message": content,
"title": title,
"url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",
},
)
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
2024-12-26 14:21:44 +08:00
await background_tasks_handler()
2024-12-19 17:00:32 +08:00
except asyncio.CancelledError:
log.warning("Task was cancelled!")
2024-12-19 17:00:32 +08:00
await event_emitter({"type": "task-cancelled"})
2024-12-28 14:46:50 +08:00
if not ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
2025-02-03 16:24:09 +08:00
"content": serialize_content_blocks(content_blocks),
2024-12-28 14:46:50 +08:00
},
)
2024-12-19 17:00:32 +08:00
if response.background is not None:
await response.background()
# background_tasks.add_task(post_response_handler, response, events)
task_id, _ = create_task(post_response_handler(response, events))
return {"status": True, "task_id": task_id}
else:
# Fallback to the original response
async def stream_wrapper(original_generator, events):
def wrap_item(item):
return f"data: {item}\n\n"
for event in events:
2025-02-25 17:00:29 +08:00
event, _ = await process_filter_functions(
request=request,
2025-03-05 10:04:55 +08:00
filter_functions=filter_functions,
2025-02-25 17:00:29 +08:00
filter_type="stream",
form_data=event,
extra_params=extra_params,
)
2025-02-25 17:03:15 +08:00
if event:
yield wrap_item(json.dumps(event))
2024-12-19 17:00:32 +08:00
async for data in original_generator:
2025-02-25 17:00:29 +08:00
data, _ = await process_filter_functions(
request=request,
2025-03-05 10:04:55 +08:00
filter_functions=filter_functions,
2025-02-25 17:00:29 +08:00
filter_type="stream",
form_data=data,
extra_params=extra_params,
)
2025-02-25 17:03:15 +08:00
if data:
yield data
2024-12-19 17:00:32 +08:00
return StreamingResponse(
stream_wrapper(response.body_iterator, events),
headers=dict(response.headers),
2024-12-27 12:38:27 +08:00
background=response.background,
2024-12-19 17:00:32 +08:00
)