Spaces:
Running
Running
from fastapi import ( | |
Depends, | |
FastAPI, | |
File, | |
Form, | |
HTTPException, | |
Request, | |
UploadFile, | |
status, | |
APIRouter, | |
) | |
import os | |
import logging | |
import shutil | |
import requests | |
from pydantic import BaseModel | |
from starlette.responses import FileResponse | |
from typing import Optional | |
from open_webui.env import SRC_LOG_LEVELS | |
from open_webui.config import CACHE_DIR | |
from open_webui.constants import ERROR_MESSAGES | |
from open_webui.routers.openai import get_all_models_responses | |
from open_webui.utils.auth import get_admin_user | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["MAIN"]) | |
################################## | |
# | |
# Pipeline Middleware | |
# | |
################################## | |
def get_sorted_filters(model_id, models): | |
filters = [ | |
model | |
for model in models.values() | |
if "pipeline" in model | |
and "type" in model["pipeline"] | |
and model["pipeline"]["type"] == "filter" | |
and ( | |
model["pipeline"]["pipelines"] == ["*"] | |
or any( | |
model_id == target_model_id | |
for target_model_id in model["pipeline"]["pipelines"] | |
) | |
) | |
] | |
sorted_filters = sorted(filters, key=lambda x: x["pipeline"]["priority"]) | |
return sorted_filters | |
def process_pipeline_inlet_filter(request, payload, user, models): | |
user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role} | |
model_id = payload["model"] | |
sorted_filters = get_sorted_filters(model_id, models) | |
model = models[model_id] | |
if "pipeline" in model: | |
sorted_filters.append(model) | |
for filter in sorted_filters: | |
r = None | |
try: | |
urlIdx = filter["urlIdx"] | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
if key == "": | |
continue | |
headers = {"Authorization": f"Bearer {key}"} | |
r = requests.post( | |
f"{url}/{filter['id']}/filter/inlet", | |
headers=headers, | |
json={ | |
"user": user, | |
"body": payload, | |
}, | |
) | |
r.raise_for_status() | |
payload = r.json() | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
if r is not None: | |
res = r.json() | |
if "detail" in res: | |
raise Exception(r.status_code, res["detail"]) | |
return payload | |
def process_pipeline_outlet_filter(request, payload, user, models): | |
user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role} | |
model_id = payload["model"] | |
sorted_filters = get_sorted_filters(model_id, models) | |
model = models[model_id] | |
if "pipeline" in model: | |
sorted_filters = [model] + sorted_filters | |
for filter in sorted_filters: | |
r = None | |
try: | |
urlIdx = filter["urlIdx"] | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
if key != "": | |
r = requests.post( | |
f"{url}/{filter['id']}/filter/outlet", | |
headers={"Authorization": f"Bearer {key}"}, | |
json={ | |
"user": user, | |
"body": payload, | |
}, | |
) | |
r.raise_for_status() | |
data = r.json() | |
payload = data | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
return Exception(r.status_code, res) | |
except Exception: | |
pass | |
else: | |
pass | |
return payload | |
################################## | |
# | |
# Pipelines Endpoints | |
# | |
################################## | |
router = APIRouter() | |
async def get_pipelines_list(request: Request, user=Depends(get_admin_user)): | |
responses = await get_all_models_responses(request) | |
log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}") | |
urlIdxs = [ | |
idx | |
for idx, response in enumerate(responses) | |
if response is not None and "pipelines" in response | |
] | |
return { | |
"data": [ | |
{ | |
"url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx], | |
"idx": urlIdx, | |
} | |
for urlIdx in urlIdxs | |
] | |
} | |
async def upload_pipeline( | |
request: Request, | |
urlIdx: int = Form(...), | |
file: UploadFile = File(...), | |
user=Depends(get_admin_user), | |
): | |
print("upload_pipeline", urlIdx, file.filename) | |
# Check if the uploaded file is a python file | |
if not (file.filename and file.filename.endswith(".py")): | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail="Only Python (.py) files are allowed.", | |
) | |
upload_folder = f"{CACHE_DIR}/pipelines" | |
os.makedirs(upload_folder, exist_ok=True) | |
file_path = os.path.join(upload_folder, file.filename) | |
r = None | |
try: | |
# Save the uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
with open(file_path, "rb") as f: | |
files = {"file": f} | |
r = requests.post( | |
f"{url}/pipelines/upload", | |
headers={"Authorization": f"Bearer {key}"}, | |
files=files, | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
status_code = status.HTTP_404_NOT_FOUND | |
if r is not None: | |
status_code = r.status_code | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=status_code, | |
detail=detail if detail else "Pipeline not found", | |
) | |
finally: | |
# Ensure the file is deleted after the upload is completed or on failure | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
class AddPipelineForm(BaseModel): | |
url: str | |
urlIdx: int | |
async def add_pipeline( | |
request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user) | |
): | |
r = None | |
try: | |
urlIdx = form_data.urlIdx | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.post( | |
f"{url}/pipelines/add", | |
headers={"Authorization": f"Bearer {key}"}, | |
json={"url": form_data.url}, | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |
class DeletePipelineForm(BaseModel): | |
id: str | |
urlIdx: int | |
async def delete_pipeline( | |
request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user) | |
): | |
r = None | |
try: | |
urlIdx = form_data.urlIdx | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.delete( | |
f"{url}/pipelines/delete", | |
headers={"Authorization": f"Bearer {key}"}, | |
json={"id": form_data.id}, | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |
async def get_pipelines( | |
request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user) | |
): | |
r = None | |
try: | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"}) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |
async def get_pipeline_valves( | |
request: Request, | |
urlIdx: Optional[int], | |
pipeline_id: str, | |
user=Depends(get_admin_user), | |
): | |
r = None | |
try: | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.get( | |
f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"} | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |
async def get_pipeline_valves_spec( | |
request: Request, | |
urlIdx: Optional[int], | |
pipeline_id: str, | |
user=Depends(get_admin_user), | |
): | |
r = None | |
try: | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.get( | |
f"{url}/{pipeline_id}/valves/spec", | |
headers={"Authorization": f"Bearer {key}"}, | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |
async def update_pipeline_valves( | |
request: Request, | |
urlIdx: Optional[int], | |
pipeline_id: str, | |
form_data: dict, | |
user=Depends(get_admin_user), | |
): | |
r = None | |
try: | |
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] | |
key = request.app.state.config.OPENAI_API_KEYS[urlIdx] | |
r = requests.post( | |
f"{url}/{pipeline_id}/valves/update", | |
headers={"Authorization": f"Bearer {key}"}, | |
json={**form_data}, | |
) | |
r.raise_for_status() | |
data = r.json() | |
return {**data} | |
except Exception as e: | |
# Handle connection error here | |
print(f"Connection error: {e}") | |
detail = None | |
if r is not None: | |
try: | |
res = r.json() | |
if "detail" in res: | |
detail = res["detail"] | |
except Exception: | |
pass | |
raise HTTPException( | |
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), | |
detail=detail if detail else "Pipeline not found", | |
) | |