532 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			532 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
import logging
 | 
						|
from typing import Optional
 | 
						|
import base64
 | 
						|
import io
 | 
						|
 | 
						|
 | 
						|
from fastapi import APIRouter, Depends, HTTPException, Request, status
 | 
						|
from fastapi.responses import Response, StreamingResponse, FileResponse
 | 
						|
from pydantic import BaseModel
 | 
						|
 | 
						|
 | 
						|
from open_webui.models.auths import Auths
 | 
						|
from open_webui.models.oauth_sessions import OAuthSessions
 | 
						|
 | 
						|
from open_webui.models.groups import Groups
 | 
						|
from open_webui.models.chats import Chats
 | 
						|
from open_webui.models.users import (
 | 
						|
    UserModel,
 | 
						|
    UserListResponse,
 | 
						|
    UserInfoListResponse,
 | 
						|
    UserRoleUpdateForm,
 | 
						|
    Users,
 | 
						|
    UserSettings,
 | 
						|
    UserUpdateForm,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
from open_webui.socket.main import (
 | 
						|
    get_active_status_by_user_id,
 | 
						|
    get_active_user_ids,
 | 
						|
    get_user_active_status,
 | 
						|
)
 | 
						|
from open_webui.constants import ERROR_MESSAGES
 | 
						|
from open_webui.env import SRC_LOG_LEVELS, STATIC_DIR
 | 
						|
 | 
						|
 | 
						|
from open_webui.utils.auth import get_admin_user, get_password_hash, get_verified_user
 | 
						|
from open_webui.utils.access_control import get_permissions, has_permission
 | 
						|
 | 
						|
 | 
						|
log = logging.getLogger(__name__)
 | 
						|
log.setLevel(SRC_LOG_LEVELS["MODELS"])
 | 
						|
 | 
						|
router = APIRouter()
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetActiveUsers
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/active")
 | 
						|
async def get_active_users(
 | 
						|
    user=Depends(get_verified_user),
 | 
						|
):
 | 
						|
    """
 | 
						|
    Get a list of active users.
 | 
						|
    """
 | 
						|
    return {
 | 
						|
        "user_ids": get_active_user_ids(),
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUsers
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
PAGE_ITEM_COUNT = 30
 | 
						|
 | 
						|
 | 
						|
@router.get("/", response_model=UserListResponse)
 | 
						|
async def get_users(
 | 
						|
    query: Optional[str] = None,
 | 
						|
    order_by: Optional[str] = None,
 | 
						|
    direction: Optional[str] = None,
 | 
						|
    page: Optional[int] = 1,
 | 
						|
    user=Depends(get_admin_user),
 | 
						|
):
 | 
						|
    limit = PAGE_ITEM_COUNT
 | 
						|
 | 
						|
    page = max(1, page)
 | 
						|
    skip = (page - 1) * limit
 | 
						|
 | 
						|
    filter = {}
 | 
						|
    if query:
 | 
						|
        filter["query"] = query
 | 
						|
    if order_by:
 | 
						|
        filter["order_by"] = order_by
 | 
						|
    if direction:
 | 
						|
        filter["direction"] = direction
 | 
						|
 | 
						|
    return Users.get_users(filter=filter, skip=skip, limit=limit)
 | 
						|
 | 
						|
 | 
						|
@router.get("/all", response_model=UserInfoListResponse)
 | 
						|
async def get_all_users(
 | 
						|
    user=Depends(get_admin_user),
 | 
						|
):
 | 
						|
    return Users.get_users()
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# User Groups
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/groups")
 | 
						|
async def get_user_groups(user=Depends(get_verified_user)):
 | 
						|
    return Groups.get_groups_by_member_id(user.id)
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# User Permissions
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/permissions")
 | 
						|
async def get_user_permissisions(request: Request, user=Depends(get_verified_user)):
 | 
						|
    user_permissions = get_permissions(
 | 
						|
        user.id, request.app.state.config.USER_PERMISSIONS
 | 
						|
    )
 | 
						|
 | 
						|
    return user_permissions
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# User Default Permissions
 | 
						|
############################
 | 
						|
class WorkspacePermissions(BaseModel):
 | 
						|
    models: bool = False
 | 
						|
    knowledge: bool = False
 | 
						|
    prompts: bool = False
 | 
						|
    tools: bool = False
 | 
						|
 | 
						|
 | 
						|
class SharingPermissions(BaseModel):
 | 
						|
    public_models: bool = True
 | 
						|
    public_knowledge: bool = True
 | 
						|
    public_prompts: bool = True
 | 
						|
    public_tools: bool = True
 | 
						|
 | 
						|
 | 
						|
class ChatPermissions(BaseModel):
 | 
						|
    controls: bool = True
 | 
						|
    valves: bool = True
 | 
						|
    system_prompt: bool = True
 | 
						|
    params: bool = True
 | 
						|
    file_upload: bool = True
 | 
						|
    delete: bool = True
 | 
						|
    delete_message: bool = True
 | 
						|
    continue_response: bool = True
 | 
						|
    regenerate_response: bool = True
 | 
						|
    rate_response: bool = True
 | 
						|
    edit: bool = True
 | 
						|
    share: bool = True
 | 
						|
    export: bool = True
 | 
						|
    stt: bool = True
 | 
						|
    tts: bool = True
 | 
						|
    call: bool = True
 | 
						|
    multiple_models: bool = True
 | 
						|
    temporary: bool = True
 | 
						|
    temporary_enforced: bool = False
 | 
						|
 | 
						|
 | 
						|
class FeaturesPermissions(BaseModel):
 | 
						|
    direct_tool_servers: bool = False
 | 
						|
    web_search: bool = True
 | 
						|
    image_generation: bool = True
 | 
						|
    code_interpreter: bool = True
 | 
						|
    notes: bool = True
 | 
						|
 | 
						|
 | 
						|
class UserPermissions(BaseModel):
 | 
						|
    workspace: WorkspacePermissions
 | 
						|
    sharing: SharingPermissions
 | 
						|
    chat: ChatPermissions
 | 
						|
    features: FeaturesPermissions
 | 
						|
 | 
						|
 | 
						|
@router.get("/default/permissions", response_model=UserPermissions)
 | 
						|
async def get_default_user_permissions(request: Request, user=Depends(get_admin_user)):
 | 
						|
    return {
 | 
						|
        "workspace": WorkspacePermissions(
 | 
						|
            **request.app.state.config.USER_PERMISSIONS.get("workspace", {})
 | 
						|
        ),
 | 
						|
        "sharing": SharingPermissions(
 | 
						|
            **request.app.state.config.USER_PERMISSIONS.get("sharing", {})
 | 
						|
        ),
 | 
						|
        "chat": ChatPermissions(
 | 
						|
            **request.app.state.config.USER_PERMISSIONS.get("chat", {})
 | 
						|
        ),
 | 
						|
        "features": FeaturesPermissions(
 | 
						|
            **request.app.state.config.USER_PERMISSIONS.get("features", {})
 | 
						|
        ),
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
@router.post("/default/permissions")
 | 
						|
async def update_default_user_permissions(
 | 
						|
    request: Request, form_data: UserPermissions, user=Depends(get_admin_user)
 | 
						|
):
 | 
						|
    request.app.state.config.USER_PERMISSIONS = form_data.model_dump()
 | 
						|
    return request.app.state.config.USER_PERMISSIONS
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserSettingsBySessionUser
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/user/settings", response_model=Optional[UserSettings])
 | 
						|
async def get_user_settings_by_session_user(user=Depends(get_verified_user)):
 | 
						|
    user = Users.get_user_by_id(user.id)
 | 
						|
    if user:
 | 
						|
        return user.settings
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# UpdateUserSettingsBySessionUser
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.post("/user/settings/update", response_model=UserSettings)
 | 
						|
async def update_user_settings_by_session_user(
 | 
						|
    request: Request, form_data: UserSettings, user=Depends(get_verified_user)
 | 
						|
):
 | 
						|
    updated_user_settings = form_data.model_dump()
 | 
						|
    if (
 | 
						|
        user.role != "admin"
 | 
						|
        and "toolServers" in updated_user_settings.get("ui").keys()
 | 
						|
        and not has_permission(
 | 
						|
            user.id,
 | 
						|
            "features.direct_tool_servers",
 | 
						|
            request.app.state.config.USER_PERMISSIONS,
 | 
						|
        )
 | 
						|
    ):
 | 
						|
        # If the user is not an admin and does not have permission to use tool servers, remove the key
 | 
						|
        updated_user_settings["ui"].pop("toolServers", None)
 | 
						|
 | 
						|
    user = Users.update_user_settings_by_id(user.id, updated_user_settings)
 | 
						|
    if user:
 | 
						|
        return user.settings
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserInfoBySessionUser
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/user/info", response_model=Optional[dict])
 | 
						|
async def get_user_info_by_session_user(user=Depends(get_verified_user)):
 | 
						|
    user = Users.get_user_by_id(user.id)
 | 
						|
    if user:
 | 
						|
        return user.info
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# UpdateUserInfoBySessionUser
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.post("/user/info/update", response_model=Optional[dict])
 | 
						|
async def update_user_info_by_session_user(
 | 
						|
    form_data: dict, user=Depends(get_verified_user)
 | 
						|
):
 | 
						|
    user = Users.get_user_by_id(user.id)
 | 
						|
    if user:
 | 
						|
        if user.info is None:
 | 
						|
            user.info = {}
 | 
						|
 | 
						|
        user = Users.update_user_by_id(user.id, {"info": {**user.info, **form_data}})
 | 
						|
        if user:
 | 
						|
            return user.info
 | 
						|
        else:
 | 
						|
            raise HTTPException(
 | 
						|
                status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
                detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
            )
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
class UserResponse(BaseModel):
 | 
						|
    name: str
 | 
						|
    profile_image_url: str
 | 
						|
    active: Optional[bool] = None
 | 
						|
 | 
						|
 | 
						|
@router.get("/{user_id}", response_model=UserResponse)
 | 
						|
async def get_user_by_id(user_id: str, user=Depends(get_verified_user)):
 | 
						|
    # Check if user_id is a shared chat
 | 
						|
    # If it is, get the user_id from the chat
 | 
						|
    if user_id.startswith("shared-"):
 | 
						|
        chat_id = user_id.replace("shared-", "")
 | 
						|
        chat = Chats.get_chat_by_id(chat_id)
 | 
						|
        if chat:
 | 
						|
            user_id = chat.user_id
 | 
						|
        else:
 | 
						|
            raise HTTPException(
 | 
						|
                status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
                detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
            )
 | 
						|
 | 
						|
    user = Users.get_user_by_id(user_id)
 | 
						|
 | 
						|
    if user:
 | 
						|
        return UserResponse(
 | 
						|
            **{
 | 
						|
                "name": user.name,
 | 
						|
                "profile_image_url": user.profile_image_url,
 | 
						|
                "active": get_active_status_by_user_id(user_id),
 | 
						|
            }
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@router.get("/{user_id}/oauth/sessions", response_model=Optional[dict])
 | 
						|
async def get_user_oauth_sessions_by_id(user_id: str, user=Depends(get_admin_user)):
 | 
						|
    sessions = OAuthSessions.get_sessions_by_user_id(user_id)
 | 
						|
    if sessions and len(sessions) > 0:
 | 
						|
        return sessions
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserProfileImageById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/{user_id}/profile/image")
 | 
						|
async def get_user_profile_image_by_id(user_id: str, user=Depends(get_verified_user)):
 | 
						|
    user = Users.get_user_by_id(user_id)
 | 
						|
    if user:
 | 
						|
        if user.profile_image_url:
 | 
						|
            # check if it's url or base64
 | 
						|
            if user.profile_image_url.startswith("http"):
 | 
						|
                return Response(
 | 
						|
                    status_code=status.HTTP_302_FOUND,
 | 
						|
                    headers={"Location": user.profile_image_url},
 | 
						|
                )
 | 
						|
            elif user.profile_image_url.startswith("data:image"):
 | 
						|
                try:
 | 
						|
                    header, base64_data = user.profile_image_url.split(",", 1)
 | 
						|
                    image_data = base64.b64decode(base64_data)
 | 
						|
                    image_buffer = io.BytesIO(image_data)
 | 
						|
 | 
						|
                    return StreamingResponse(
 | 
						|
                        image_buffer,
 | 
						|
                        media_type="image/png",
 | 
						|
                        headers={"Content-Disposition": "inline; filename=image.png"},
 | 
						|
                    )
 | 
						|
                except Exception as e:
 | 
						|
                    pass
 | 
						|
        return FileResponse(f"{STATIC_DIR}/user.png")
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserActiveStatusById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/{user_id}/active", response_model=dict)
 | 
						|
async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)):
 | 
						|
    return {
 | 
						|
        "active": get_user_active_status(user_id),
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# UpdateUserById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.post("/{user_id}/update", response_model=Optional[UserModel])
 | 
						|
async def update_user_by_id(
 | 
						|
    user_id: str,
 | 
						|
    form_data: UserUpdateForm,
 | 
						|
    session_user=Depends(get_admin_user),
 | 
						|
):
 | 
						|
    # Prevent modification of the primary admin user by other admins
 | 
						|
    try:
 | 
						|
        first_user = Users.get_first_user()
 | 
						|
        if first_user:
 | 
						|
            if user_id == first_user.id:
 | 
						|
                if session_user.id != user_id:
 | 
						|
                    # If the user trying to update is the primary admin, and they are not the primary admin themselves
 | 
						|
                    raise HTTPException(
 | 
						|
                        status_code=status.HTTP_403_FORBIDDEN,
 | 
						|
                        detail=ERROR_MESSAGES.ACTION_PROHIBITED,
 | 
						|
                    )
 | 
						|
 | 
						|
                if form_data.role != "admin":
 | 
						|
                    # If the primary admin is trying to change their own role, prevent it
 | 
						|
                    raise HTTPException(
 | 
						|
                        status_code=status.HTTP_403_FORBIDDEN,
 | 
						|
                        detail=ERROR_MESSAGES.ACTION_PROHIBITED,
 | 
						|
                    )
 | 
						|
 | 
						|
    except Exception as e:
 | 
						|
        log.error(f"Error checking primary admin status: {e}")
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
 | 
						|
            detail="Could not verify primary admin status.",
 | 
						|
        )
 | 
						|
 | 
						|
    user = Users.get_user_by_id(user_id)
 | 
						|
 | 
						|
    if user:
 | 
						|
        if form_data.email.lower() != user.email:
 | 
						|
            email_user = Users.get_user_by_email(form_data.email.lower())
 | 
						|
            if email_user:
 | 
						|
                raise HTTPException(
 | 
						|
                    status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
                    detail=ERROR_MESSAGES.EMAIL_TAKEN,
 | 
						|
                )
 | 
						|
 | 
						|
        if form_data.password:
 | 
						|
            hashed = get_password_hash(form_data.password)
 | 
						|
            log.debug(f"hashed: {hashed}")
 | 
						|
            Auths.update_user_password_by_id(user_id, hashed)
 | 
						|
 | 
						|
        Auths.update_email_by_id(user_id, form_data.email.lower())
 | 
						|
        updated_user = Users.update_user_by_id(
 | 
						|
            user_id,
 | 
						|
            {
 | 
						|
                "role": form_data.role,
 | 
						|
                "name": form_data.name,
 | 
						|
                "email": form_data.email.lower(),
 | 
						|
                "profile_image_url": form_data.profile_image_url,
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        if updated_user:
 | 
						|
            return updated_user
 | 
						|
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
            detail=ERROR_MESSAGES.DEFAULT(),
 | 
						|
        )
 | 
						|
 | 
						|
    raise HTTPException(
 | 
						|
        status_code=status.HTTP_400_BAD_REQUEST,
 | 
						|
        detail=ERROR_MESSAGES.USER_NOT_FOUND,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# DeleteUserById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.delete("/{user_id}", response_model=bool)
 | 
						|
async def delete_user_by_id(user_id: str, user=Depends(get_admin_user)):
 | 
						|
    # Prevent deletion of the primary admin user
 | 
						|
    try:
 | 
						|
        first_user = Users.get_first_user()
 | 
						|
        if first_user and user_id == first_user.id:
 | 
						|
            raise HTTPException(
 | 
						|
                status_code=status.HTTP_403_FORBIDDEN,
 | 
						|
                detail=ERROR_MESSAGES.ACTION_PROHIBITED,
 | 
						|
            )
 | 
						|
    except Exception as e:
 | 
						|
        log.error(f"Error checking primary admin status: {e}")
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
 | 
						|
            detail="Could not verify primary admin status.",
 | 
						|
        )
 | 
						|
 | 
						|
    if user.id != user_id:
 | 
						|
        result = Auths.delete_auth_by_id(user_id)
 | 
						|
 | 
						|
        if result:
 | 
						|
            return True
 | 
						|
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
 | 
						|
            detail=ERROR_MESSAGES.DELETE_USER_ERROR,
 | 
						|
        )
 | 
						|
 | 
						|
    # Prevent self-deletion
 | 
						|
    raise HTTPException(
 | 
						|
        status_code=status.HTTP_403_FORBIDDEN,
 | 
						|
        detail=ERROR_MESSAGES.ACTION_PROHIBITED,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
############################
 | 
						|
# GetUserGroupsById
 | 
						|
############################
 | 
						|
 | 
						|
 | 
						|
@router.get("/{user_id}/groups")
 | 
						|
async def get_user_groups_by_id(user_id: str, user=Depends(get_admin_user)):
 | 
						|
    return Groups.get_groups_by_member_id(user_id)
 |