|
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, UploadFile, Query |
|
from .Schema import EditorRequest, TaskInfo |
|
from App.Worker import celery_task, concatenate_videos |
|
from celery.result import AsyncResult |
|
import aiofiles, os, uuid, aiohttp |
|
from App import SERVER_STATE, Task |
|
|
|
videditor_router = APIRouter(tags=["vidEditor"]) |
|
|
|
|
|
@videditor_router.post("/create-video") |
|
async def create_video(videoRequest: EditorRequest, background_task: BackgroundTasks): |
|
background_task.add_task(celery_task, videoRequest) |
|
return {"task_id": "started"} |
|
|
|
|
|
@videditor_router.post("/create-chunks") |
|
async def create_chunks(videoRequest: EditorRequest, background_task: BackgroundTasks): |
|
video_duration = videoRequest.constants.duration |
|
task_id = uuid.uuid4() |
|
new_task = Task(TASK_ID=task_id) |
|
|
|
active_nodes = [ |
|
node |
|
for node in SERVER_STATE.NODES |
|
if await new_task._check_node_online(node.SPACE_HOST) |
|
] |
|
number_of_nodes = len(active_nodes) |
|
ranges = [ |
|
[i, i + number_of_nodes] for i in range(0, video_duration, number_of_nodes) |
|
] |
|
for i, node in enumerate(active_nodes): |
|
await new_task.add_node(node, i) |
|
|
|
SERVER_STATE.TASKS[task_id] = new_task |
|
|
|
async with aiohttp.ClientSession() as session: |
|
for i, node in enumerate(active_nodes): |
|
videoRequest.constants.frames = ranges[i] |
|
if node.SPACE_HOST == SERVER_STATE.SPACE_HOST: |
|
background_task.add_task(celery_task, videoRequest) |
|
async with session.post( |
|
"node.SPACE_HOST/create-video", json=videoRequest |
|
) as response: |
|
if response.status != 200: |
|
raise HTTPException( |
|
status_code=response.status, |
|
detail="Failed to post request to node", |
|
) |
|
|
|
return {"task_id": "started"} |
|
|
|
|
|
@videditor_router.post("/uploadfile/") |
|
async def create_file( |
|
background_tasks: BackgroundTasks, |
|
file: UploadFile, |
|
node: str, |
|
chunk: int, |
|
task: str, |
|
): |
|
|
|
chunk_directory = f"/tmp/Video/{task}" |
|
file_name = f"{chunk_directory}/{chunk}.mp4" |
|
|
|
os.makedirs(chunk_directory, exist_ok=True) |
|
|
|
try: |
|
async with aiofiles.open(file_name, "wb") as f: |
|
while contents := await file.read(1024 * 1): |
|
await f.write(contents) |
|
|
|
except Exception as e: |
|
return { |
|
"message": f"There was an error uploading the file, error message {str(e)} " |
|
} |
|
finally: |
|
await file.close() |
|
running_task = SERVER_STATE.TASKS[task] |
|
running_task.mark_node_completed(node) |
|
if running_task.is_completed(): |
|
background_tasks.add_task(concatenate_videos, chunk_directory) |
|
|
|
return {"message": "File uploaded successfully"} |
|
|