""" DaaS (Docker as a Service) is a service that allows users to run docker commands on the server side. """ from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi import FastAPI, File, UploadFile, HTTPException from pydantic import BaseModel, Field from typing import Optional, Dict import time import os import asyncio import subprocess import uuid import threading import queue from shared_utils.docker_as_service_api import DockerServiceApiComModel app = FastAPI() def python_obj_to_pickle_file_bytes(obj): import pickle import io with io.BytesIO() as f: pickle.dump(obj, f) return f.getvalue() def yield_message(message): dsacm = DockerServiceApiComModel(server_message=message) return python_obj_to_pickle_file_bytes(dsacm) def read_output(stream, output_queue): while True: line_stdout = stream.readline() # print('recv') if line_stdout: output_queue.put(line_stdout) else: break async def stream_generator(request_obj): import tempfile # Create a temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Construct the docker command download_folder = temp_dir # Get list of existing files before download existing_file_before_download = [] video_id = request_obj.client_command cmd = [ '/root/.dotnet/tools/BBDown', video_id, '--use-app-api', '--work-dir', f'{os.path.abspath(temp_dir)}' ] cmd = ' '.join(cmd) yield yield_message(cmd) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) stdout_queue = queue.Queue() thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) thread.daemon = True thread.start() stderr_queue = queue.Queue() thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) thread.daemon = True thread.start() while True: print("looping") # Check if there is any output in the queue try: output_stdout = stdout_queue.get_nowait() # Non-blocking get if output_stdout: print(output_stdout) yield yield_message(output_stdout) output_stderr = stderr_queue.get_nowait() # Non-blocking get if output_stderr: print(output_stdout) yield yield_message(output_stderr) except queue.Empty: pass # No output available # Break the loop if the process has finished if process.poll() is not None: break await asyncio.sleep(0.25) # Get the return code return_code = process.returncode yield yield_message("(return code:) " + str(return_code)) # print(f"Successfully downloaded video {video_id}") existing_file_after_download = list(os.listdir(download_folder)) # get the difference downloaded_files = [ f for f in existing_file_after_download if f not in existing_file_before_download ] downloaded_files_path = [ os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download ] # read file server_file_attach = {} for fp, fn in zip(downloaded_files_path, downloaded_files): with open(fp, "rb") as f: file_bytes = f.read() server_file_attach[fn] = file_bytes dsacm = DockerServiceApiComModel( server_message="complete", server_file_attach=server_file_attach, ) yield python_obj_to_pickle_file_bytes(dsacm) @app.post("/stream") async def stream_response(file: UploadFile = File(...)): # read the file in memory, treat it as pickle file, and unpickle it import pickle import io content = await file.read() with io.BytesIO(content) as f: request_obj = pickle.load(f) # process the request_obj return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream") @app.get("/") async def hi(): return "Hello, this is Docker as a Service (DaaS)!" if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=49000)