Spaces:
Running
Running
import logging | |
import time | |
import uuid | |
from typing import Optional | |
from open_webui.internal.db import Base, get_db | |
from open_webui.models.chats import Chats | |
from open_webui.env import SRC_LOG_LEVELS | |
from pydantic import BaseModel, ConfigDict | |
from sqlalchemy import BigInteger, Column, Text, JSON, Boolean | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
#################### | |
# Folder DB Schema | |
#################### | |
class Folder(Base): | |
__tablename__ = "folder" | |
id = Column(Text, primary_key=True) | |
parent_id = Column(Text, nullable=True) | |
user_id = Column(Text) | |
name = Column(Text) | |
items = Column(JSON, nullable=True) | |
meta = Column(JSON, nullable=True) | |
is_expanded = Column(Boolean, default=False) | |
created_at = Column(BigInteger) | |
updated_at = Column(BigInteger) | |
class FolderModel(BaseModel): | |
id: str | |
parent_id: Optional[str] = None | |
user_id: str | |
name: str | |
items: Optional[dict] = None | |
meta: Optional[dict] = None | |
is_expanded: bool = False | |
created_at: int | |
updated_at: int | |
model_config = ConfigDict(from_attributes=True) | |
#################### | |
# Forms | |
#################### | |
class FolderForm(BaseModel): | |
name: str | |
model_config = ConfigDict(extra="allow") | |
class FolderTable: | |
def insert_new_folder( | |
self, user_id: str, name: str, parent_id: Optional[str] = None | |
) -> Optional[FolderModel]: | |
with get_db() as db: | |
id = str(uuid.uuid4()) | |
folder = FolderModel( | |
**{ | |
"id": id, | |
"user_id": user_id, | |
"name": name, | |
"parent_id": parent_id, | |
"created_at": int(time.time()), | |
"updated_at": int(time.time()), | |
} | |
) | |
try: | |
result = Folder(**folder.model_dump()) | |
db.add(result) | |
db.commit() | |
db.refresh(result) | |
if result: | |
return FolderModel.model_validate(result) | |
else: | |
return None | |
except Exception as e: | |
print(e) | |
return None | |
def get_folder_by_id_and_user_id( | |
self, id: str, user_id: str | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return None | |
return FolderModel.model_validate(folder) | |
except Exception: | |
return None | |
def get_children_folders_by_id_and_user_id( | |
self, id: str, user_id: str | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
folders = [] | |
def get_children(folder): | |
children = self.get_folders_by_parent_id_and_user_id( | |
folder.id, user_id | |
) | |
for child in children: | |
get_children(child) | |
folders.append(child) | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return None | |
get_children(folder) | |
return folders | |
except Exception: | |
return None | |
def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]: | |
with get_db() as db: | |
return [ | |
FolderModel.model_validate(folder) | |
for folder in db.query(Folder).filter_by(user_id=user_id).all() | |
] | |
def get_folder_by_parent_id_and_user_id_and_name( | |
self, parent_id: Optional[str], user_id: str, name: str | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
# Check if folder exists | |
folder = ( | |
db.query(Folder) | |
.filter_by(parent_id=parent_id, user_id=user_id) | |
.filter(Folder.name.ilike(name)) | |
.first() | |
) | |
if not folder: | |
return None | |
return FolderModel.model_validate(folder) | |
except Exception as e: | |
log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}") | |
return None | |
def get_folders_by_parent_id_and_user_id( | |
self, parent_id: Optional[str], user_id: str | |
) -> list[FolderModel]: | |
with get_db() as db: | |
return [ | |
FolderModel.model_validate(folder) | |
for folder in db.query(Folder) | |
.filter_by(parent_id=parent_id, user_id=user_id) | |
.all() | |
] | |
def update_folder_parent_id_by_id_and_user_id( | |
self, | |
id: str, | |
user_id: str, | |
parent_id: str, | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return None | |
folder.parent_id = parent_id | |
folder.updated_at = int(time.time()) | |
db.commit() | |
return FolderModel.model_validate(folder) | |
except Exception as e: | |
log.error(f"update_folder: {e}") | |
return | |
def update_folder_name_by_id_and_user_id( | |
self, id: str, user_id: str, name: str | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return None | |
existing_folder = ( | |
db.query(Folder) | |
.filter_by(name=name, parent_id=folder.parent_id, user_id=user_id) | |
.first() | |
) | |
if existing_folder: | |
return None | |
folder.name = name | |
folder.updated_at = int(time.time()) | |
db.commit() | |
return FolderModel.model_validate(folder) | |
except Exception as e: | |
log.error(f"update_folder: {e}") | |
return | |
def update_folder_is_expanded_by_id_and_user_id( | |
self, id: str, user_id: str, is_expanded: bool | |
) -> Optional[FolderModel]: | |
try: | |
with get_db() as db: | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return None | |
folder.is_expanded = is_expanded | |
folder.updated_at = int(time.time()) | |
db.commit() | |
return FolderModel.model_validate(folder) | |
except Exception as e: | |
log.error(f"update_folder: {e}") | |
return | |
def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> bool: | |
try: | |
with get_db() as db: | |
folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
if not folder: | |
return False | |
# Delete all chats in the folder | |
Chats.delete_chats_by_user_id_and_folder_id(user_id, folder.id) | |
# Delete all children folders | |
def delete_children(folder): | |
folder_children = self.get_folders_by_parent_id_and_user_id( | |
folder.id, user_id | |
) | |
for folder_child in folder_children: | |
Chats.delete_chats_by_user_id_and_folder_id( | |
user_id, folder_child.id | |
) | |
delete_children(folder_child) | |
folder = db.query(Folder).filter_by(id=folder_child.id).first() | |
db.delete(folder) | |
db.commit() | |
delete_children(folder) | |
db.delete(folder) | |
db.commit() | |
return True | |
except Exception as e: | |
log.error(f"delete_folder: {e}") | |
return False | |
Folders = FolderTable() | |