|
""" |
|
DaaS (Docker as a Service) is a service |
|
that allows users to run docker commands on the server side. |
|
""" |
|
|
|
from fastapi import FastAPI |
|
from fastapi.responses import StreamingResponse |
|
from fastapi import FastAPI, File, UploadFile, HTTPException |
|
from pydantic import BaseModel, Field |
|
from typing import Optional, Dict |
|
import time |
|
import os |
|
import asyncio |
|
import subprocess |
|
import uuid |
|
import threading |
|
import queue |
|
from shared_utils.docker_as_service_api import DockerServiceApiComModel |
|
|
|
app = FastAPI() |
|
|
|
def python_obj_to_pickle_file_bytes(obj): |
|
import pickle |
|
import io |
|
with io.BytesIO() as f: |
|
pickle.dump(obj, f) |
|
return f.getvalue() |
|
|
|
def yield_message(message): |
|
dsacm = DockerServiceApiComModel(server_message=message) |
|
return python_obj_to_pickle_file_bytes(dsacm) |
|
|
|
def read_output(stream, output_queue): |
|
while True: |
|
line_stdout = stream.readline() |
|
|
|
if line_stdout: |
|
output_queue.put(line_stdout) |
|
else: |
|
break |
|
|
|
|
|
async def stream_generator(request_obj): |
|
import tempfile |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
|
|
download_folder = temp_dir |
|
|
|
|
|
existing_file_before_download = [] |
|
|
|
video_id = request_obj.client_command |
|
cmd = [ |
|
'/root/.dotnet/tools/BBDown', |
|
video_id, |
|
'--use-app-api', |
|
'--work-dir', |
|
f'{os.path.abspath(temp_dir)}' |
|
] |
|
cmd = ' '.join(cmd) |
|
yield yield_message(cmd) |
|
process = subprocess.Popen(cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
shell=True, |
|
text=True) |
|
|
|
stdout_queue = queue.Queue() |
|
thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) |
|
thread.daemon = True |
|
thread.start() |
|
stderr_queue = queue.Queue() |
|
thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
while True: |
|
print("looping") |
|
|
|
try: |
|
output_stdout = stdout_queue.get_nowait() |
|
if output_stdout: |
|
print(output_stdout) |
|
yield yield_message(output_stdout) |
|
|
|
output_stderr = stderr_queue.get_nowait() |
|
if output_stderr: |
|
print(output_stdout) |
|
yield yield_message(output_stderr) |
|
except queue.Empty: |
|
pass |
|
|
|
|
|
if process.poll() is not None: |
|
break |
|
|
|
await asyncio.sleep(0.25) |
|
|
|
|
|
return_code = process.returncode |
|
yield yield_message("(return code:) " + str(return_code)) |
|
|
|
|
|
existing_file_after_download = list(os.listdir(download_folder)) |
|
|
|
downloaded_files = [ |
|
f for f in existing_file_after_download if f not in existing_file_before_download |
|
] |
|
downloaded_files_path = [ |
|
os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download |
|
] |
|
|
|
server_file_attach = {} |
|
for fp, fn in zip(downloaded_files_path, downloaded_files): |
|
with open(fp, "rb") as f: |
|
file_bytes = f.read() |
|
server_file_attach[fn] = file_bytes |
|
|
|
dsacm = DockerServiceApiComModel( |
|
server_message="complete", |
|
server_file_attach=server_file_attach, |
|
) |
|
yield python_obj_to_pickle_file_bytes(dsacm) |
|
|
|
|
|
@app.post("/stream") |
|
async def stream_response(file: UploadFile = File(...)): |
|
|
|
import pickle |
|
import io |
|
content = await file.read() |
|
with io.BytesIO(content) as f: |
|
request_obj = pickle.load(f) |
|
|
|
return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream") |
|
|
|
@app.get("/") |
|
async def hi(): |
|
return "Hello, this is Docker as a Service (DaaS)!" |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=49000) |
|
|