348 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			348 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
import base64
 | 
						|
import logging
 | 
						|
import mimetypes
 | 
						|
import uuid
 | 
						|
 | 
						|
import aiohttp
 | 
						|
from authlib.integrations.starlette_client import OAuth
 | 
						|
from authlib.oidc.core import UserInfo
 | 
						|
from fastapi import (
 | 
						|
    HTTPException,
 | 
						|
    status,
 | 
						|
)
 | 
						|
from starlette.responses import RedirectResponse
 | 
						|
 | 
						|
from open_webui.models.auths import Auths
 | 
						|
from open_webui.models.users import Users
 | 
						|
from open_webui.models.groups import Groups, GroupModel, GroupUpdateForm
 | 
						|
from open_webui.config import (
 | 
						|
    DEFAULT_USER_ROLE,
 | 
						|
    ENABLE_OAUTH_SIGNUP,
 | 
						|
    OAUTH_MERGE_ACCOUNTS_BY_EMAIL,
 | 
						|
    OAUTH_PROVIDERS,
 | 
						|
    ENABLE_OAUTH_ROLE_MANAGEMENT,
 | 
						|
    ENABLE_OAUTH_GROUP_MANAGEMENT,
 | 
						|
    OAUTH_ROLES_CLAIM,
 | 
						|
    OAUTH_GROUPS_CLAIM,
 | 
						|
    OAUTH_EMAIL_CLAIM,
 | 
						|
    OAUTH_PICTURE_CLAIM,
 | 
						|
    OAUTH_USERNAME_CLAIM,
 | 
						|
    OAUTH_ALLOWED_ROLES,
 | 
						|
    OAUTH_ADMIN_ROLES,
 | 
						|
    OAUTH_ALLOWED_DOMAINS,
 | 
						|
    WEBHOOK_URL,
 | 
						|
    JWT_EXPIRES_IN,
 | 
						|
    AppConfig,
 | 
						|
)
 | 
						|
from open_webui.constants import ERROR_MESSAGES
 | 
						|
from open_webui.env import WEBUI_SESSION_COOKIE_SAME_SITE, WEBUI_SESSION_COOKIE_SECURE
 | 
						|
from open_webui.utils.misc import parse_duration
 | 
						|
from open_webui.utils.auth import get_password_hash, create_token
 | 
						|
from open_webui.utils.webhook import post_webhook
 | 
						|
 | 
						|
log = logging.getLogger(__name__)
 | 
						|
 | 
						|
auth_manager_config = AppConfig()
 | 
						|
auth_manager_config.DEFAULT_USER_ROLE = DEFAULT_USER_ROLE
 | 
						|
auth_manager_config.ENABLE_OAUTH_SIGNUP = ENABLE_OAUTH_SIGNUP
 | 
						|
auth_manager_config.OAUTH_MERGE_ACCOUNTS_BY_EMAIL = OAUTH_MERGE_ACCOUNTS_BY_EMAIL
 | 
						|
auth_manager_config.ENABLE_OAUTH_ROLE_MANAGEMENT = ENABLE_OAUTH_ROLE_MANAGEMENT
 | 
						|
auth_manager_config.ENABLE_OAUTH_GROUP_MANAGEMENT = ENABLE_OAUTH_GROUP_MANAGEMENT
 | 
						|
auth_manager_config.OAUTH_ROLES_CLAIM = OAUTH_ROLES_CLAIM
 | 
						|
auth_manager_config.OAUTH_GROUPS_CLAIM = OAUTH_GROUPS_CLAIM
 | 
						|
auth_manager_config.OAUTH_EMAIL_CLAIM = OAUTH_EMAIL_CLAIM
 | 
						|
auth_manager_config.OAUTH_PICTURE_CLAIM = OAUTH_PICTURE_CLAIM
 | 
						|
auth_manager_config.OAUTH_USERNAME_CLAIM = OAUTH_USERNAME_CLAIM
 | 
						|
auth_manager_config.OAUTH_ALLOWED_ROLES = OAUTH_ALLOWED_ROLES
 | 
						|
auth_manager_config.OAUTH_ADMIN_ROLES = OAUTH_ADMIN_ROLES
 | 
						|
auth_manager_config.OAUTH_ALLOWED_DOMAINS = OAUTH_ALLOWED_DOMAINS
 | 
						|
auth_manager_config.WEBHOOK_URL = WEBHOOK_URL
 | 
						|
auth_manager_config.JWT_EXPIRES_IN = JWT_EXPIRES_IN
 | 
						|
 | 
						|
 | 
						|
class OAuthManager:
 | 
						|
    def __init__(self):
 | 
						|
        self.oauth = OAuth()
 | 
						|
        for provider_name, provider_config in OAUTH_PROVIDERS.items():
 | 
						|
            self.oauth.register(
 | 
						|
                name=provider_name,
 | 
						|
                client_id=provider_config["client_id"],
 | 
						|
                client_secret=provider_config["client_secret"],
 | 
						|
                server_metadata_url=provider_config["server_metadata_url"],
 | 
						|
                client_kwargs={
 | 
						|
                    "scope": provider_config["scope"],
 | 
						|
                },
 | 
						|
                redirect_uri=provider_config["redirect_uri"],
 | 
						|
            )
 | 
						|
 | 
						|
    def get_client(self, provider_name):
 | 
						|
        return self.oauth.create_client(provider_name)
 | 
						|
 | 
						|
    def get_user_role(self, user, user_data):
 | 
						|
        if user and Users.get_num_users() == 1:
 | 
						|
            # If the user is the only user, assign the role "admin" - actually repairs role for single user on login
 | 
						|
            return "admin"
 | 
						|
        if not user and Users.get_num_users() == 0:
 | 
						|
            # If there are no users, assign the role "admin", as the first user will be an admin
 | 
						|
            return "admin"
 | 
						|
 | 
						|
        if auth_manager_config.ENABLE_OAUTH_ROLE_MANAGEMENT:
 | 
						|
            oauth_claim = auth_manager_config.OAUTH_ROLES_CLAIM
 | 
						|
            oauth_allowed_roles = auth_manager_config.OAUTH_ALLOWED_ROLES
 | 
						|
            oauth_admin_roles = auth_manager_config.OAUTH_ADMIN_ROLES
 | 
						|
            oauth_roles = None
 | 
						|
            role = "pending"  # Default/fallback role if no matching roles are found
 | 
						|
 | 
						|
            # Next block extracts the roles from the user data, accepting nested claims of any depth
 | 
						|
            if oauth_claim and oauth_allowed_roles and oauth_admin_roles:
 | 
						|
                claim_data = user_data
 | 
						|
                nested_claims = oauth_claim.split(".")
 | 
						|
                for nested_claim in nested_claims:
 | 
						|
                    claim_data = claim_data.get(nested_claim, {})
 | 
						|
                oauth_roles = claim_data if isinstance(claim_data, list) else None
 | 
						|
 | 
						|
            # If any roles are found, check if they match the allowed or admin roles
 | 
						|
            if oauth_roles:
 | 
						|
                # If role management is enabled, and matching roles are provided, use the roles
 | 
						|
                for allowed_role in oauth_allowed_roles:
 | 
						|
                    # If the user has any of the allowed roles, assign the role "user"
 | 
						|
                    if allowed_role in oauth_roles:
 | 
						|
                        role = "user"
 | 
						|
                        break
 | 
						|
                for admin_role in oauth_admin_roles:
 | 
						|
                    # If the user has any of the admin roles, assign the role "admin"
 | 
						|
                    if admin_role in oauth_roles:
 | 
						|
                        role = "admin"
 | 
						|
                        break
 | 
						|
        else:
 | 
						|
            if not user:
 | 
						|
                # If role management is disabled, use the default role for new users
 | 
						|
                role = auth_manager_config.DEFAULT_USER_ROLE
 | 
						|
            else:
 | 
						|
                # If role management is disabled, use the existing role for existing users
 | 
						|
                role = user.role
 | 
						|
 | 
						|
        return role
 | 
						|
 | 
						|
    def update_user_groups(self, user, user_data, default_permissions):
 | 
						|
        oauth_claim = auth_manager_config.OAUTH_GROUPS_CLAIM
 | 
						|
 | 
						|
        user_oauth_groups: list[str] = user_data.get(oauth_claim, list())
 | 
						|
        user_current_groups: list[GroupModel] = Groups.get_groups_by_member_id(user.id)
 | 
						|
        all_available_groups: list[GroupModel] = Groups.get_groups()
 | 
						|
 | 
						|
        # Remove groups that user is no longer a part of
 | 
						|
        for group_model in user_current_groups:
 | 
						|
            if group_model.name not in user_oauth_groups:
 | 
						|
                # Remove group from user
 | 
						|
 | 
						|
                user_ids = group_model.user_ids
 | 
						|
                user_ids = [i for i in user_ids if i != user.id]
 | 
						|
 | 
						|
                # In case a group is created, but perms are never assigned to the group by hitting "save"
 | 
						|
                group_permissions = group_model.permissions
 | 
						|
                if not group_permissions:
 | 
						|
                    group_permissions = default_permissions
 | 
						|
 | 
						|
                update_form = GroupUpdateForm(
 | 
						|
                    name=group_model.name,
 | 
						|
                    description=group_model.description,
 | 
						|
                    permissions=group_permissions,
 | 
						|
                    user_ids=user_ids,
 | 
						|
                )
 | 
						|
                Groups.update_group_by_id(
 | 
						|
                    id=group_model.id, form_data=update_form, overwrite=False
 | 
						|
                )
 | 
						|
 | 
						|
        # Add user to new groups
 | 
						|
        for group_model in all_available_groups:
 | 
						|
            if group_model.name in user_oauth_groups and not any(
 | 
						|
                gm.name == group_model.name for gm in user_current_groups
 | 
						|
            ):
 | 
						|
                # Add user to group
 | 
						|
 | 
						|
                user_ids = group_model.user_ids
 | 
						|
                user_ids.append(user.id)
 | 
						|
 | 
						|
                # In case a group is created, but perms are never assigned to the group by hitting "save"
 | 
						|
                group_permissions = group_model.permissions
 | 
						|
                if not group_permissions:
 | 
						|
                    group_permissions = default_permissions
 | 
						|
 | 
						|
                update_form = GroupUpdateForm(
 | 
						|
                    name=group_model.name,
 | 
						|
                    description=group_model.description,
 | 
						|
                    permissions=group_permissions,
 | 
						|
                    user_ids=user_ids,
 | 
						|
                )
 | 
						|
                Groups.update_group_by_id(
 | 
						|
                    id=group_model.id, form_data=update_form, overwrite=False
 | 
						|
                )
 | 
						|
 | 
						|
    async def handle_login(self, provider, request):
 | 
						|
        if provider not in OAUTH_PROVIDERS:
 | 
						|
            raise HTTPException(404)
 | 
						|
        # If the provider has a custom redirect URL, use that, otherwise automatically generate one
 | 
						|
        redirect_uri = OAUTH_PROVIDERS[provider].get("redirect_uri") or request.url_for(
 | 
						|
            "oauth_callback", provider=provider
 | 
						|
        )
 | 
						|
        client = self.get_client(provider)
 | 
						|
        if client is None:
 | 
						|
            raise HTTPException(404)
 | 
						|
        return await client.authorize_redirect(request, redirect_uri)
 | 
						|
 | 
						|
    async def handle_callback(self, provider, request, response):
 | 
						|
        if provider not in OAUTH_PROVIDERS:
 | 
						|
            raise HTTPException(404)
 | 
						|
        client = self.get_client(provider)
 | 
						|
        try:
 | 
						|
            token = await client.authorize_access_token(request)
 | 
						|
        except Exception as e:
 | 
						|
            log.warning(f"OAuth callback error: {e}")
 | 
						|
            raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
 | 
						|
        user_data: UserInfo = token["userinfo"]
 | 
						|
        if not user_data:
 | 
						|
            user_data: UserInfo = await client.userinfo(token=token)
 | 
						|
        if not user_data:
 | 
						|
            log.warning(f"OAuth callback failed, user data is missing: {token}")
 | 
						|
            raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
 | 
						|
 | 
						|
        sub = user_data.get("sub")
 | 
						|
        if not sub:
 | 
						|
            log.warning(f"OAuth callback failed, sub is missing: {user_data}")
 | 
						|
            raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
 | 
						|
        provider_sub = f"{provider}@{sub}"
 | 
						|
        email_claim = auth_manager_config.OAUTH_EMAIL_CLAIM
 | 
						|
        email = user_data.get(email_claim, "").lower()
 | 
						|
        # We currently mandate that email addresses are provided
 | 
						|
        if not email:
 | 
						|
            log.warning(f"OAuth callback failed, email is missing: {user_data}")
 | 
						|
            raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
 | 
						|
        if (
 | 
						|
            "*" not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
 | 
						|
            and email.split("@")[-1] not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
 | 
						|
        ):
 | 
						|
            log.warning(
 | 
						|
                f"OAuth callback failed, e-mail domain is not in the list of allowed domains: {user_data}"
 | 
						|
            )
 | 
						|
            raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
 | 
						|
 | 
						|
        # Check if the user exists
 | 
						|
        user = Users.get_user_by_oauth_sub(provider_sub)
 | 
						|
 | 
						|
        if not user:
 | 
						|
            # If the user does not exist, check if merging is enabled
 | 
						|
            if auth_manager_config.OAUTH_MERGE_ACCOUNTS_BY_EMAIL:
 | 
						|
                # Check if the user exists by email
 | 
						|
                user = Users.get_user_by_email(email)
 | 
						|
                if user:
 | 
						|
                    # Update the user with the new oauth sub
 | 
						|
                    Users.update_user_oauth_sub_by_id(user.id, provider_sub)
 | 
						|
 | 
						|
        if user:
 | 
						|
            determined_role = self.get_user_role(user, user_data)
 | 
						|
            if user.role != determined_role:
 | 
						|
                Users.update_user_role_by_id(user.id, determined_role)
 | 
						|
 | 
						|
        if not user:
 | 
						|
            # If the user does not exist, check if signups are enabled
 | 
						|
            if auth_manager_config.ENABLE_OAUTH_SIGNUP:
 | 
						|
                # Check if an existing user with the same email already exists
 | 
						|
                existing_user = Users.get_user_by_email(
 | 
						|
                    user_data.get("email", "").lower()
 | 
						|
                )
 | 
						|
                if existing_user:
 | 
						|
                    raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
 | 
						|
 | 
						|
                picture_claim = auth_manager_config.OAUTH_PICTURE_CLAIM
 | 
						|
                picture_url = user_data.get(picture_claim, "")
 | 
						|
                if picture_url:
 | 
						|
                    # Download the profile image into a base64 string
 | 
						|
                    try:
 | 
						|
                        async with aiohttp.ClientSession() as session:
 | 
						|
                            async with session.get(picture_url) as resp:
 | 
						|
                                picture = await resp.read()
 | 
						|
                                base64_encoded_picture = base64.b64encode(
 | 
						|
                                    picture
 | 
						|
                                ).decode("utf-8")
 | 
						|
                                guessed_mime_type = mimetypes.guess_type(picture_url)[0]
 | 
						|
                                if guessed_mime_type is None:
 | 
						|
                                    # assume JPG, browsers are tolerant enough of image formats
 | 
						|
                                    guessed_mime_type = "image/jpeg"
 | 
						|
                                picture_url = f"data:{guessed_mime_type};base64,{base64_encoded_picture}"
 | 
						|
                    except Exception as e:
 | 
						|
                        log.error(
 | 
						|
                            f"Error downloading profile image '{picture_url}': {e}"
 | 
						|
                        )
 | 
						|
                        picture_url = ""
 | 
						|
                if not picture_url:
 | 
						|
                    picture_url = "/user.png"
 | 
						|
                username_claim = auth_manager_config.OAUTH_USERNAME_CLAIM
 | 
						|
 | 
						|
                role = self.get_user_role(None, user_data)
 | 
						|
 | 
						|
                user = Auths.insert_new_auth(
 | 
						|
                    email=email,
 | 
						|
                    password=get_password_hash(
 | 
						|
                        str(uuid.uuid4())
 | 
						|
                    ),  # Random password, not used
 | 
						|
                    name=user_data.get(username_claim, "User"),
 | 
						|
                    profile_image_url=picture_url,
 | 
						|
                    role=role,
 | 
						|
                    oauth_sub=provider_sub,
 | 
						|
                )
 | 
						|
 | 
						|
                if auth_manager_config.WEBHOOK_URL:
 | 
						|
                    post_webhook(
 | 
						|
                        auth_manager_config.WEBHOOK_URL,
 | 
						|
                        auth_manager_config.WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
 | 
						|
                        {
 | 
						|
                            "action": "signup",
 | 
						|
                            "message": auth_manager_config.WEBHOOK_MESSAGES.USER_SIGNUP(
 | 
						|
                                user.name
 | 
						|
                            ),
 | 
						|
                            "user": user.model_dump_json(exclude_none=True),
 | 
						|
                        },
 | 
						|
                    )
 | 
						|
            else:
 | 
						|
                raise HTTPException(
 | 
						|
                    status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
 | 
						|
                )
 | 
						|
 | 
						|
        jwt_token = create_token(
 | 
						|
            data={"id": user.id},
 | 
						|
            expires_delta=parse_duration(auth_manager_config.JWT_EXPIRES_IN),
 | 
						|
        )
 | 
						|
 | 
						|
        if auth_manager_config.ENABLE_OAUTH_GROUP_MANAGEMENT:
 | 
						|
            self.update_user_groups(
 | 
						|
                user=user,
 | 
						|
                user_data=user_data,
 | 
						|
                default_permissions=request.app.state.config.USER_PERMISSIONS,
 | 
						|
            )
 | 
						|
 | 
						|
        # Set the cookie token
 | 
						|
        response.set_cookie(
 | 
						|
            key="token",
 | 
						|
            value=jwt_token,
 | 
						|
            httponly=True,  # Ensures the cookie is not accessible via JavaScript
 | 
						|
            samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
 | 
						|
            secure=WEBUI_SESSION_COOKIE_SECURE,
 | 
						|
        )
 | 
						|
 | 
						|
        if ENABLE_OAUTH_SIGNUP.value:
 | 
						|
            oauth_id_token = token.get("id_token")
 | 
						|
            response.set_cookie(
 | 
						|
                key="oauth_id_token",
 | 
						|
                value=oauth_id_token,
 | 
						|
                httponly=True,
 | 
						|
                samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
 | 
						|
                secure=WEBUI_SESSION_COOKIE_SECURE,
 | 
						|
            )
 | 
						|
        # Redirect back to the frontend with the JWT token
 | 
						|
        redirect_url = f"{request.base_url}auth#token={jwt_token}"
 | 
						|
        return RedirectResponse(url=redirect_url, headers=response.headers)
 | 
						|
 | 
						|
 | 
						|
oauth_manager = OAuthManager()
 |