open-webui/backend/open_webui/utils/misc.py

533 lines
16 KiB
Python
Raw Normal View History

2023-11-19 13:41:43 +08:00
import hashlib
2024-01-03 08:22:48 +08:00
import re
import threading
2024-06-20 19:38:59 +08:00
import time
2024-08-28 06:10:27 +08:00
import uuid
import logging
2024-08-28 06:10:27 +08:00
from datetime import timedelta
from pathlib import Path
from typing import Callable, Optional
import json
2024-06-09 18:01:25 +08:00
2024-09-07 10:09:57 +08:00
2025-02-05 15:05:14 +08:00
import collections.abc
from open_webui.env import SRC_LOG_LEVELS
2025-02-05 15:05:14 +08:00
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
2025-02-05 15:05:14 +08:00
def deep_update(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = deep_update(d.get(k, {}), v)
else:
d[k] = v
return d
2025-09-14 16:26:46 +08:00
def get_message_list(messages_map, message_id):
2024-12-19 17:00:32 +08:00
"""
Reconstructs a list of messages in order up to the specified message_id.
:param message_id: ID of the message to reconstruct the chain
:param messages: Message history dict containing all messages
:return: List of ordered messages starting from the root to the given message
"""
# Handle case where messages is None
2025-09-14 16:26:46 +08:00
if not messages_map:
return [] # Return empty list instead of None to prevent iteration errors
2024-12-19 17:00:32 +08:00
# Find the message by its id
2025-09-14 16:26:46 +08:00
current_message = messages_map.get(message_id)
2024-12-19 17:00:32 +08:00
if not current_message:
return [] # Return empty list instead of None to prevent iteration errors
2024-12-19 17:00:32 +08:00
# Reconstruct the chain by following the parentId links
message_list = []
while current_message:
message_list.insert(
0, current_message
) # Insert the message at the beginning of the list
parent_id = current_message.get("parentId") # Use .get() for safety
2025-09-14 16:26:46 +08:00
current_message = messages_map.get(parent_id) if parent_id else None
2024-12-19 17:00:32 +08:00
return message_list
2024-09-07 10:09:57 +08:00
def get_messages_content(messages: list[dict]) -> str:
return "\n".join(
[
f"{message['role'].upper()}: {get_content_from_message(message)}"
for message in messages
]
)
2024-08-03 21:24:26 +08:00
2024-06-09 18:01:25 +08:00
2024-08-14 20:46:31 +08:00
def get_last_user_message_item(messages: list[dict]) -> Optional[dict]:
2024-06-09 18:01:25 +08:00
for message in reversed(messages):
if message["role"] == "user":
2024-07-02 15:37:21 +08:00
return message
return None
def get_content_from_message(message: dict) -> Optional[str]:
2025-05-28 05:34:53 +08:00
if isinstance(message.get("content"), list):
for item in message["content"]:
if item["type"] == "text":
return item["text"]
else:
2025-05-28 05:34:53 +08:00
return message.get("content")
2024-06-09 18:01:25 +08:00
return None
2024-08-14 20:46:31 +08:00
def get_last_user_message(messages: list[dict]) -> Optional[str]:
message = get_last_user_message_item(messages)
if message is None:
return None
return get_content_from_message(message)
2024-12-25 14:45:21 +08:00
def get_last_assistant_message_item(messages: list[dict]) -> Optional[dict]:
for message in reversed(messages):
if message["role"] == "assistant":
return message
return None
2024-08-14 20:46:31 +08:00
def get_last_assistant_message(messages: list[dict]) -> Optional[str]:
2024-06-09 18:01:25 +08:00
for message in reversed(messages):
if message["role"] == "assistant":
return get_content_from_message(message)
2024-06-09 18:01:25 +08:00
return None
2024-08-14 20:46:31 +08:00
def get_system_message(messages: list[dict]) -> Optional[dict]:
2024-06-18 04:28:29 +08:00
for message in messages:
if message["role"] == "system":
return message
return None
2024-08-14 20:46:31 +08:00
def remove_system_message(messages: list[dict]) -> list[dict]:
2024-06-18 04:28:29 +08:00
return [message for message in messages if message["role"] != "system"]
2024-08-14 20:58:37 +08:00
def pop_system_message(messages: list[dict]) -> tuple[Optional[dict], list[dict]]:
2024-06-18 04:28:29 +08:00
return get_system_message(messages), remove_system_message(messages)
2025-09-24 22:38:14 +08:00
def update_message_content(message: dict, content: str, append: bool = True) -> dict:
if isinstance(message["content"], list):
for item in message["content"]:
if item["type"] == "text":
if append:
item["text"] = f"{item['text']}\n{content}"
else:
item["text"] = f"{content}\n{item['text']}"
else:
if append:
message["content"] = f"{message['content']}\n{content}"
else:
message["content"] = f"{content}\n{message['content']}"
return message
2024-07-26 19:22:13 +08:00
2025-05-23 07:26:14 +08:00
def add_or_update_system_message(
content: str, messages: list[dict], append: bool = False
):
2024-06-09 18:01:25 +08:00
"""
Adds a new system message at the beginning of the messages list
or updates the existing system message at the beginning.
:param msg: The message to be added or appended.
:param messages: The list of message dictionaries.
:return: The updated list of message dictionaries.
"""
if messages and messages[0].get("role") == "system":
2025-09-24 22:38:14 +08:00
messages[0] = update_message_content(messages[0], content, append)
2024-06-09 18:01:25 +08:00
else:
# Insert at the beginning
messages.insert(0, {"role": "system", "content": content})
return messages
2023-11-19 13:41:43 +08:00
2024-06-20 19:38:59 +08:00
2025-09-24 22:38:14 +08:00
def add_or_update_user_message(content: str, messages: list[dict], append: bool = True):
2025-02-03 17:14:38 +08:00
"""
Adds a new user message at the end of the messages list
or updates the existing user message at the end.
:param msg: The message to be added or appended.
:param messages: The list of message dictionaries.
:return: The updated list of message dictionaries.
"""
if messages and messages[-1].get("role") == "user":
2025-09-24 22:38:14 +08:00
messages[-1] = update_message_content(messages[-1], content, append)
2025-02-03 17:14:38 +08:00
else:
# Insert at the end
messages.append({"role": "user", "content": content})
return messages
2025-09-24 22:38:14 +08:00
def prepend_to_first_user_message_content(
content: str, messages: list[dict]
) -> list[dict]:
for message in messages:
if message["role"] == "user":
message = update_message_content(message, content, append=False)
break
return messages
2025-02-03 07:11:50 +08:00
def append_or_update_assistant_message(content: str, messages: list[dict]):
"""
Adds a new assistant message at the end of the messages list
or updates the existing assistant message at the end.
:param msg: The message to be added or appended.
:param messages: The list of message dictionaries.
:return: The updated list of message dictionaries.
"""
if messages and messages[-1].get("role") == "assistant":
messages[-1]["content"] = f"{messages[-1]['content']}\n{content}"
else:
# Insert at the end
messages.append({"role": "assistant", "content": content})
return messages
2024-08-01 05:01:22 +08:00
def openai_chat_message_template(model: str):
2024-06-20 19:38:59 +08:00
return {
"id": f"{model}-{str(uuid.uuid4())}",
"created": int(time.time()),
"model": model,
2024-07-31 22:26:26 +08:00
"choices": [{"index": 0, "logprobs": None, "finish_reason": None}],
2024-06-20 19:38:59 +08:00
}
2024-07-31 22:26:26 +08:00
2024-09-21 07:07:57 +08:00
def openai_chat_chunk_message_template(
2025-02-05 13:42:49 +08:00
model: str,
content: Optional[str] = None,
2025-06-10 17:10:31 +08:00
reasoning_content: Optional[str] = None,
2025-02-05 13:42:49 +08:00
tool_calls: Optional[list[dict]] = None,
usage: Optional[dict] = None,
2024-09-21 07:07:57 +08:00
) -> dict:
2024-08-01 05:01:22 +08:00
template = openai_chat_message_template(model)
2024-07-31 22:26:26 +08:00
template["object"] = "chat.completion.chunk"
2025-02-05 13:42:49 +08:00
2025-02-05 15:05:14 +08:00
template["choices"][0]["index"] = 0
2025-02-05 13:42:49 +08:00
template["choices"][0]["delta"] = {}
if content:
template["choices"][0]["delta"]["content"] = content
2025-06-10 17:10:31 +08:00
if reasoning_content:
2025-06-10 17:16:44 +08:00
template["choices"][0]["delta"]["reasoning_content"] = reasoning_content
2025-06-10 17:10:31 +08:00
2025-02-05 13:42:49 +08:00
if tool_calls:
template["choices"][0]["delta"]["tool_calls"] = tool_calls
2025-08-09 04:31:28 +08:00
if not content and not reasoning_content and not tool_calls:
2024-09-21 07:07:57 +08:00
template["choices"][0]["finish_reason"] = "stop"
2024-12-13 15:31:08 +08:00
if usage:
template["usage"] = usage
2024-07-31 22:26:26 +08:00
return template
2024-09-21 07:07:57 +08:00
def openai_chat_completion_message_template(
2025-02-13 16:13:33 +08:00
model: str,
message: Optional[str] = None,
2025-06-10 17:10:31 +08:00
reasoning_content: Optional[str] = None,
tool_calls: Optional[list[dict]] = None,
2025-02-13 16:13:33 +08:00
usage: Optional[dict] = None,
2024-09-21 07:07:57 +08:00
) -> dict:
2024-08-01 05:01:22 +08:00
template = openai_chat_message_template(model)
2024-07-31 22:26:26 +08:00
template["object"] = "chat.completion"
2024-10-07 05:52:12 +08:00
if message is not None:
2025-02-14 07:17:41 +08:00
template["choices"][0]["message"] = {
"role": "assistant",
2025-06-10 17:10:31 +08:00
"content": message,
**({"reasoning_content": reasoning_content} if reasoning_content else {}),
2025-02-14 07:17:41 +08:00
**({"tool_calls": tool_calls} if tool_calls else {}),
}
2024-07-31 22:26:26 +08:00
template["choices"][0]["finish_reason"] = "stop"
2024-12-13 15:31:08 +08:00
if usage:
template["usage"] = usage
2024-08-04 19:10:02 +08:00
return template
2024-07-31 22:26:26 +08:00
2023-11-19 13:41:43 +08:00
def get_gravatar_url(email):
# Trim leading and trailing whitespace from
# an email address and force all characters
# to lower case
address = str(email).strip().lower()
# Create a SHA256 hash of the final string
hash_object = hashlib.sha256(address.encode())
hash_hex = hash_object.hexdigest()
# Grab the actual image URL
2023-11-19 16:46:27 +08:00
return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp"
2023-12-24 07:38:52 +08:00
2025-02-08 12:52:24 +08:00
def calculate_sha256(file_path, chunk_size):
2025-02-10 14:20:47 +08:00
# Compute SHA-256 hash of a file efficiently in chunks
2023-12-24 07:38:52 +08:00
sha256 = hashlib.sha256()
2025-02-08 12:52:24 +08:00
with open(file_path, "rb") as f:
while chunk := f.read(chunk_size):
sha256.update(chunk)
2023-12-24 07:38:52 +08:00
return sha256.hexdigest()
2024-01-03 08:22:48 +08:00
2024-01-27 14:17:28 +08:00
def calculate_sha256_string(string):
# Create a new SHA-256 hash object
sha256_hash = hashlib.sha256()
# Update the hash object with the bytes of the input string
sha256_hash.update(string.encode("utf-8"))
# Get the hexadecimal representation of the hash
hashed_string = sha256_hash.hexdigest()
return hashed_string
2024-01-03 08:22:48 +08:00
def validate_email_format(email: str) -> bool:
2024-05-08 23:40:18 +08:00
if email.endswith("@localhost"):
return True
return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email))
2024-02-18 13:06:08 +08:00
def sanitize_filename(file_name):
# Convert to lowercase
lower_case_file_name = file_name.lower()
# Remove special characters using regular expression
sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name)
# Replace spaces with dashes
final_file_name = re.sub(r"\s+", "-", sanitized_file_name)
return final_file_name
def extract_folders_after_data_docs(path):
# Convert the path to a Path object if it's not already
path = Path(path)
# Extract parts of the path
parts = path.parts
# Find the index of '/data/docs' in the path
try:
index_data_docs = parts.index("data") + 1
index_docs = parts.index("docs", index_data_docs) + 1
except ValueError:
return []
# Exclude the filename and accumulate folder names
tags = []
folders = parts[index_docs:-1]
for idx, _ in enumerate(folders):
2024-02-18 13:06:08 +08:00
tags.append("/".join(folders[: idx + 1]))
return tags
2024-02-20 12:44:00 +08:00
def parse_duration(duration: str) -> Optional[timedelta]:
if duration == "-1" or duration == "0":
return None
# Regular expression to find number and unit pairs
pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)"
matches = re.findall(pattern, duration)
if not matches:
raise ValueError("Invalid duration string")
total_duration = timedelta()
for number, _, unit in matches:
number = float(number)
if unit == "ms":
total_duration += timedelta(milliseconds=number)
elif unit == "s":
total_duration += timedelta(seconds=number)
elif unit == "m":
total_duration += timedelta(minutes=number)
elif unit == "h":
total_duration += timedelta(hours=number)
elif unit == "d":
total_duration += timedelta(days=number)
elif unit == "w":
total_duration += timedelta(weeks=number)
return total_duration
2024-05-25 10:26:27 +08:00
def parse_ollama_modelfile(model_text):
parameters_meta = {
"mirostat": int,
"mirostat_eta": float,
"mirostat_tau": float,
"num_ctx": int,
"repeat_last_n": int,
"repeat_penalty": float,
"temperature": float,
"seed": int,
"tfs_z": float,
"num_predict": int,
"top_k": int,
"top_p": float,
2024-06-04 03:48:17 +08:00
"num_keep": int,
"typical_p": float,
"presence_penalty": float,
"frequency_penalty": float,
"penalize_newline": bool,
"numa": bool,
"num_batch": int,
"num_gpu": int,
"main_gpu": int,
"low_vram": bool,
"f16_kv": bool,
"vocab_only": bool,
"use_mmap": bool,
"use_mlock": bool,
"num_thread": int,
2024-05-25 10:26:27 +08:00
}
data = {"base_model_id": None, "params": {}}
# Parse base model
base_model_match = re.search(
r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE
)
if base_model_match:
data["base_model_id"] = base_model_match.group(1)
# Parse template
template_match = re.search(
r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
)
if template_match:
data["params"] = {"template": template_match.group(1).strip()}
# Parse stops
stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE)
if stops:
data["params"]["stop"] = stops
# Parse other parameters from the provided list
for param, param_type in parameters_meta.items():
param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE)
if param_match:
value = param_match.group(1)
2024-06-04 03:48:17 +08:00
try:
if param_type is int:
2024-06-04 03:48:17 +08:00
value = int(value)
elif param_type is float:
2024-06-04 03:48:17 +08:00
value = float(value)
elif param_type is bool:
2024-06-04 03:48:17 +08:00
value = value.lower() == "true"
except Exception as e:
log.exception(f"Failed to parse parameter {param}: {e}")
2024-06-04 03:48:17 +08:00
continue
2024-05-25 10:26:27 +08:00
data["params"][param] = value
# Parse adapter
adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE)
if adapter_match:
data["params"]["adapter"] = adapter_match.group(1)
# Parse system description
system_desc_match = re.search(
r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
)
2024-06-09 04:45:33 +08:00
system_desc_match_single = re.search(
r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE
)
2024-05-25 10:26:27 +08:00
if system_desc_match:
data["params"]["system"] = system_desc_match.group(1).strip()
2024-06-09 04:45:33 +08:00
elif system_desc_match_single:
data["params"]["system"] = system_desc_match_single.group(1).strip()
2024-05-25 10:26:27 +08:00
# Parse messages
messages = []
message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE)
for role, content in message_matches:
messages.append({"role": role, "content": content})
if messages:
data["params"]["messages"] = messages
return data
2025-03-01 23:28:00 +08:00
def convert_logit_bias_input_to_json(user_input):
2025-03-01 23:28:00 +08:00
logit_bias_pairs = user_input.split(",")
logit_bias_json = {}
for pair in logit_bias_pairs:
2025-03-01 23:28:00 +08:00
token, bias = pair.split(":")
token = str(token.strip())
bias = int(bias.strip())
bias = 100 if bias > 100 else -100 if bias < -100 else bias
logit_bias_json[token] = bias
2025-03-01 23:28:00 +08:00
return json.dumps(logit_bias_json)
def freeze(value):
"""
Freeze a value to make it hashable.
"""
if isinstance(value, dict):
return frozenset((k, freeze(v)) for k, v in value.items())
elif isinstance(value, list):
return tuple(freeze(v) for v in value)
return value
2025-08-17 08:06:16 +08:00
def throttle(interval: float = 10.0):
"""
Decorator to prevent a function from being called more than once within a specified duration.
If the function is called again within the duration, it returns None. To avoid returning
different types, the return type of the function should be Optional[T].
:param interval: Duration in seconds to wait before allowing the function to be called again.
"""
def decorator(func):
last_calls = {}
lock = threading.Lock()
def wrapper(*args, **kwargs):
2025-08-17 08:06:16 +08:00
if interval is None:
return func(*args, **kwargs)
key = (args, freeze(kwargs))
now = time.time()
if now - last_calls.get(key, 0) < interval:
return None
with lock:
if now - last_calls.get(key, 0) < interval:
return None
last_calls[key] = now
return func(*args, **kwargs)
return wrapper
return decorator