222 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
		
		
			
		
	
	
			222 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| 
								 | 
							
								import json
							 | 
						||
| 
								 | 
							
								import logging
							 | 
						||
| 
								 | 
							
								import time
							 | 
						||
| 
								 | 
							
								from typing import Optional
							 | 
						||
| 
								 | 
							
								import uuid
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from open_webui.internal.db import Base, get_db
							 | 
						||
| 
								 | 
							
								from open_webui.env import SRC_LOG_LEVELS
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from open_webui.models.files import FileMetadataResponse
							 | 
						||
| 
								 | 
							
								from open_webui.models.users import Users, UserResponse
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from pydantic import BaseModel, ConfigDict
							 | 
						||
| 
								 | 
							
								from sqlalchemy import BigInteger, Column, String, Text, JSON
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from open_webui.utils.access_control import has_access
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								log = logging.getLogger(__name__)
							 | 
						||
| 
								 | 
							
								log.setLevel(SRC_LOG_LEVELS["MODELS"])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								####################
							 | 
						||
| 
								 | 
							
								# Knowledge DB Schema
							 | 
						||
| 
								 | 
							
								####################
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class Knowledge(Base):
							 | 
						||
| 
								 | 
							
								    __tablename__ = "knowledge"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    id = Column(Text, unique=True, primary_key=True)
							 | 
						||
| 
								 | 
							
								    user_id = Column(Text)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    name = Column(Text)
							 | 
						||
| 
								 | 
							
								    description = Column(Text)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    data = Column(JSON, nullable=True)
							 | 
						||
| 
								 | 
							
								    meta = Column(JSON, nullable=True)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    access_control = Column(JSON, nullable=True)  # Controls data access levels.
							 | 
						||
| 
								 | 
							
								    # Defines access control rules for this entry.
							 | 
						||
| 
								 | 
							
								    # - `None`: Public access, available to all users with the "user" role.
							 | 
						||
| 
								 | 
							
								    # - `{}`: Private access, restricted exclusively to the owner.
							 | 
						||
| 
								 | 
							
								    # - Custom permissions: Specific access control for reading and writing;
							 | 
						||
| 
								 | 
							
								    #   Can specify group or user-level restrictions:
							 | 
						||
| 
								 | 
							
								    #   {
							 | 
						||
| 
								 | 
							
								    #      "read": {
							 | 
						||
| 
								 | 
							
								    #          "group_ids": ["group_id1", "group_id2"],
							 | 
						||
| 
								 | 
							
								    #          "user_ids":  ["user_id1", "user_id2"]
							 | 
						||
| 
								 | 
							
								    #      },
							 | 
						||
| 
								 | 
							
								    #      "write": {
							 | 
						||
| 
								 | 
							
								    #          "group_ids": ["group_id1", "group_id2"],
							 | 
						||
| 
								 | 
							
								    #          "user_ids":  ["user_id1", "user_id2"]
							 | 
						||
| 
								 | 
							
								    #      }
							 | 
						||
| 
								 | 
							
								    #   }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    created_at = Column(BigInteger)
							 | 
						||
| 
								 | 
							
								    updated_at = Column(BigInteger)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeModel(BaseModel):
							 | 
						||
| 
								 | 
							
								    model_config = ConfigDict(from_attributes=True)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    id: str
							 | 
						||
| 
								 | 
							
								    user_id: str
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    name: str
							 | 
						||
| 
								 | 
							
								    description: str
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    data: Optional[dict] = None
							 | 
						||
| 
								 | 
							
								    meta: Optional[dict] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    access_control: Optional[dict] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    created_at: int  # timestamp in epoch
							 | 
						||
| 
								 | 
							
								    updated_at: int  # timestamp in epoch
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								####################
							 | 
						||
| 
								 | 
							
								# Forms
							 | 
						||
| 
								 | 
							
								####################
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeUserModel(KnowledgeModel):
							 | 
						||
| 
								 | 
							
								    user: Optional[UserResponse] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeResponse(KnowledgeModel):
							 | 
						||
| 
								 | 
							
								    files: Optional[list[FileMetadataResponse | dict]] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeUserResponse(KnowledgeUserModel):
							 | 
						||
| 
								 | 
							
								    files: Optional[list[FileMetadataResponse | dict]] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeForm(BaseModel):
							 | 
						||
| 
								 | 
							
								    name: str
							 | 
						||
| 
								 | 
							
								    description: str
							 | 
						||
| 
								 | 
							
								    data: Optional[dict] = None
							 | 
						||
| 
								 | 
							
								    access_control: Optional[dict] = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class KnowledgeTable:
							 | 
						||
| 
								 | 
							
								    def insert_new_knowledge(
							 | 
						||
| 
								 | 
							
								        self, user_id: str, form_data: KnowledgeForm
							 | 
						||
| 
								 | 
							
								    ) -> Optional[KnowledgeModel]:
							 | 
						||
| 
								 | 
							
								        with get_db() as db:
							 | 
						||
| 
								 | 
							
								            knowledge = KnowledgeModel(
							 | 
						||
| 
								 | 
							
								                **{
							 | 
						||
| 
								 | 
							
								                    **form_data.model_dump(),
							 | 
						||
| 
								 | 
							
								                    "id": str(uuid.uuid4()),
							 | 
						||
| 
								 | 
							
								                    "user_id": user_id,
							 | 
						||
| 
								 | 
							
								                    "created_at": int(time.time()),
							 | 
						||
| 
								 | 
							
								                    "updated_at": int(time.time()),
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                result = Knowledge(**knowledge.model_dump())
							 | 
						||
| 
								 | 
							
								                db.add(result)
							 | 
						||
| 
								 | 
							
								                db.commit()
							 | 
						||
| 
								 | 
							
								                db.refresh(result)
							 | 
						||
| 
								 | 
							
								                if result:
							 | 
						||
| 
								 | 
							
								                    return KnowledgeModel.model_validate(result)
							 | 
						||
| 
								 | 
							
								                else:
							 | 
						||
| 
								 | 
							
								                    return None
							 | 
						||
| 
								 | 
							
								            except Exception:
							 | 
						||
| 
								 | 
							
								                return None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def get_knowledge_bases(self) -> list[KnowledgeUserModel]:
							 | 
						||
| 
								 | 
							
								        with get_db() as db:
							 | 
						||
| 
								 | 
							
								            knowledge_bases = []
							 | 
						||
| 
								 | 
							
								            for knowledge in (
							 | 
						||
| 
								 | 
							
								                db.query(Knowledge).order_by(Knowledge.updated_at.desc()).all()
							 | 
						||
| 
								 | 
							
								            ):
							 | 
						||
| 
								 | 
							
								                user = Users.get_user_by_id(knowledge.user_id)
							 | 
						||
| 
								 | 
							
								                knowledge_bases.append(
							 | 
						||
| 
								 | 
							
								                    KnowledgeUserModel.model_validate(
							 | 
						||
| 
								 | 
							
								                        {
							 | 
						||
| 
								 | 
							
								                            **KnowledgeModel.model_validate(knowledge).model_dump(),
							 | 
						||
| 
								 | 
							
								                            "user": user.model_dump() if user else None,
							 | 
						||
| 
								 | 
							
								                        }
							 | 
						||
| 
								 | 
							
								                    )
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            return knowledge_bases
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def get_knowledge_bases_by_user_id(
							 | 
						||
| 
								 | 
							
								        self, user_id: str, permission: str = "write"
							 | 
						||
| 
								 | 
							
								    ) -> list[KnowledgeUserModel]:
							 | 
						||
| 
								 | 
							
								        knowledge_bases = self.get_knowledge_bases()
							 | 
						||
| 
								 | 
							
								        return [
							 | 
						||
| 
								 | 
							
								            knowledge_base
							 | 
						||
| 
								 | 
							
								            for knowledge_base in knowledge_bases
							 | 
						||
| 
								 | 
							
								            if knowledge_base.user_id == user_id
							 | 
						||
| 
								 | 
							
								            or has_access(user_id, permission, knowledge_base.access_control)
							 | 
						||
| 
								 | 
							
								        ]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def get_knowledge_by_id(self, id: str) -> Optional[KnowledgeModel]:
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            with get_db() as db:
							 | 
						||
| 
								 | 
							
								                knowledge = db.query(Knowledge).filter_by(id=id).first()
							 | 
						||
| 
								 | 
							
								                return KnowledgeModel.model_validate(knowledge) if knowledge else None
							 | 
						||
| 
								 | 
							
								        except Exception:
							 | 
						||
| 
								 | 
							
								            return None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def update_knowledge_by_id(
							 | 
						||
| 
								 | 
							
								        self, id: str, form_data: KnowledgeForm, overwrite: bool = False
							 | 
						||
| 
								 | 
							
								    ) -> Optional[KnowledgeModel]:
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            with get_db() as db:
							 | 
						||
| 
								 | 
							
								                knowledge = self.get_knowledge_by_id(id=id)
							 | 
						||
| 
								 | 
							
								                db.query(Knowledge).filter_by(id=id).update(
							 | 
						||
| 
								 | 
							
								                    {
							 | 
						||
| 
								 | 
							
								                        **form_data.model_dump(),
							 | 
						||
| 
								 | 
							
								                        "updated_at": int(time.time()),
							 | 
						||
| 
								 | 
							
								                    }
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								                db.commit()
							 | 
						||
| 
								 | 
							
								                return self.get_knowledge_by_id(id=id)
							 | 
						||
| 
								 | 
							
								        except Exception as e:
							 | 
						||
| 
								 | 
							
								            log.exception(e)
							 | 
						||
| 
								 | 
							
								            return None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def update_knowledge_data_by_id(
							 | 
						||
| 
								 | 
							
								        self, id: str, data: dict
							 | 
						||
| 
								 | 
							
								    ) -> Optional[KnowledgeModel]:
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            with get_db() as db:
							 | 
						||
| 
								 | 
							
								                knowledge = self.get_knowledge_by_id(id=id)
							 | 
						||
| 
								 | 
							
								                db.query(Knowledge).filter_by(id=id).update(
							 | 
						||
| 
								 | 
							
								                    {
							 | 
						||
| 
								 | 
							
								                        "data": data,
							 | 
						||
| 
								 | 
							
								                        "updated_at": int(time.time()),
							 | 
						||
| 
								 | 
							
								                    }
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								                db.commit()
							 | 
						||
| 
								 | 
							
								                return self.get_knowledge_by_id(id=id)
							 | 
						||
| 
								 | 
							
								        except Exception as e:
							 | 
						||
| 
								 | 
							
								            log.exception(e)
							 | 
						||
| 
								 | 
							
								            return None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def delete_knowledge_by_id(self, id: str) -> bool:
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            with get_db() as db:
							 | 
						||
| 
								 | 
							
								                db.query(Knowledge).filter_by(id=id).delete()
							 | 
						||
| 
								 | 
							
								                db.commit()
							 | 
						||
| 
								 | 
							
								                return True
							 | 
						||
| 
								 | 
							
								        except Exception:
							 | 
						||
| 
								 | 
							
								            return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def delete_all_knowledge(self) -> bool:
							 | 
						||
| 
								 | 
							
								        with get_db() as db:
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                db.query(Knowledge).delete()
							 | 
						||
| 
								 | 
							
								                db.commit()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                return True
							 | 
						||
| 
								 | 
							
								            except Exception:
							 | 
						||
| 
								 | 
							
								                return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Knowledges = KnowledgeTable()
							 |