bbdown-3 / docker_as_a_service /docker_as_a_service.py
qingxu98's picture
upload
312898d
raw
history blame
4.64 kB
"""
DaaS (Docker as a Service) is a service
that allows users to run docker commands on the server side.
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel
app = FastAPI()
def python_obj_to_pickle_file_bytes(obj):
import pickle
import io
with io.BytesIO() as f:
pickle.dump(obj, f)
return f.getvalue()
def yield_message(message):
dsacm = DockerServiceApiComModel(server_message=message)
return python_obj_to_pickle_file_bytes(dsacm)
def read_output(stream, output_queue):
while True:
line_stdout = stream.readline()
# print('recv')
if line_stdout:
output_queue.put(line_stdout)
else:
break
async def stream_generator(request_obj):
import tempfile
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Construct the docker command
download_folder = temp_dir
# Get list of existing files before download
existing_file_before_download = []
video_id = request_obj.client_command
cmd = [
'/root/.dotnet/tools/BBDown',
video_id,
'--use-app-api',
'--work-dir',
f'{os.path.abspath(temp_dir)}'
]
cmd = ' '.join(cmd)
yield yield_message(cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True)
stdout_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
thread.daemon = True
thread.start()
stderr_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
thread.daemon = True
thread.start()
while True:
print("looping")
# Check if there is any output in the queue
try:
output_stdout = stdout_queue.get_nowait() # Non-blocking get
if output_stdout:
print(output_stdout)
yield yield_message(output_stdout)
output_stderr = stderr_queue.get_nowait() # Non-blocking get
if output_stderr:
print(output_stdout)
yield yield_message(output_stderr)
except queue.Empty:
pass # No output available
# Break the loop if the process has finished
if process.poll() is not None:
break
await asyncio.sleep(0.25)
# Get the return code
return_code = process.returncode
yield yield_message("(return code:) " + str(return_code))
# print(f"Successfully downloaded video {video_id}")
existing_file_after_download = list(os.listdir(download_folder))
# get the difference
downloaded_files = [
f for f in existing_file_after_download if f not in existing_file_before_download
]
downloaded_files_path = [
os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
]
# read file
server_file_attach = {}
for fp, fn in zip(downloaded_files_path, downloaded_files):
with open(fp, "rb") as f:
file_bytes = f.read()
server_file_attach[fn] = file_bytes
dsacm = DockerServiceApiComModel(
server_message="complete",
server_file_attach=server_file_attach,
)
yield python_obj_to_pickle_file_bytes(dsacm)
@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
# read the file in memory, treat it as pickle file, and unpickle it
import pickle
import io
content = await file.read()
with io.BytesIO(content) as f:
request_obj = pickle.load(f)
# process the request_obj
return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")
@app.get("/")
async def hi():
return "Hello, this is Docker as a Service (DaaS)!"
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=49000)