bbdown-4 / docker_as_a_service /docker_as_a_service_host.py
qingxu98's picture
improve multi p
c7d9c76
"""
DaaS (Docker as a Service) is a service
that allows users to run docker commands on the server side.
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import glob
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel
app = FastAPI()
def python_obj_to_pickle_file_bytes(obj):
import pickle
import io
with io.BytesIO() as f:
pickle.dump(obj, f)
return f.getvalue()
def yield_message(message):
dsacm = DockerServiceApiComModel(server_message=message)
return python_obj_to_pickle_file_bytes(dsacm)
def read_output(stream, output_queue):
while True:
line_stdout = stream.readline()
# print('recv')
if line_stdout:
output_queue.put(line_stdout)
else:
break
async def stream_generator(request_obj):
import tempfile
# Create a temporary directory
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:
# Construct the docker command
download_folder = temp_dir
# Get list of existing files before download
existing_file_before_download = []
video_id = request_obj.client_command
cmd = [
'/root/.dotnet/tools/BBDown',
video_id,
'--use-app-api',
'--work-dir',
f'{os.path.abspath(temp_dir)}'
]
cmd_chmod = []
cmd = [
'docker', 'run', '--rm',
'-v',
f'{os.path.abspath(temp_dir)}:/downloads',
'bbdown',
video_id,
'--use-app-api',
'--work-dir',
'/downloads'
]
cmd_chmod = [
'docker', 'run', '--rm',
'-v',
f'{os.path.abspath(temp_dir)}:/downloads',
'--entrypoint=""', # override the entrypoint
'bbdown', # image name
# chmod -R 777 /downloads
'chmod',
'-R',
'777',
'/downloads'
]
cmd = ' '.join(cmd)
yield yield_message(cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True)
stdout_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
thread.daemon = True
thread.start()
stderr_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
thread.daemon = True
thread.start()
while True:
print("looping")
# Check if there is any output in the queue
stdout_this_round = ""
stderr_this_round = ""
while True:
try:
output_stdout = stdout_queue.get_nowait() # Non-blocking get
if output_stdout:
stdout_this_round += output_stdout
print(output_stdout)
except queue.Empty:
yield yield_message(stdout_this_round)
break
while True:
try:
output_stderr = stderr_queue.get_nowait() # Non-blocking get
if output_stderr:
stderr_this_round += output_stderr
print(output_stderr)
except queue.Empty:
yield yield_message(stderr_this_round)
break
# Break the loop if the process has finished
if process.poll() is not None:
break
await asyncio.sleep(0.5)
# Get the return code
return_code = process.returncode
yield yield_message("(daas return code:) " + str(return_code))
# change files mod to 777
if cmd_chmod:
docker_chmod_res = subprocess.call(' '.join(cmd_chmod), shell=True)
# print(f"Successfully downloaded video {video_id}")
existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*'))
# existing_file_after_download = list(os.listdir(download_folder))
# get the difference
downloaded_files = [
f for f in existing_file_after_download if f not in existing_file_before_download
]
downloaded_files_path = [
os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
]
# read file
server_file_attach = {}
for fp, fn in zip(downloaded_files_path, downloaded_files):
if os.path.isdir(fp): continue
with open(fp, "rb") as f:
file_bytes = f.read()
server_file_attach[fn] = file_bytes
dsacm = DockerServiceApiComModel(
server_message="complete",
server_file_attach=server_file_attach,
)
yield python_obj_to_pickle_file_bytes(dsacm)
def simple_generator(return_obj):
dsacm = DockerServiceApiComModel(
server_message=return_obj,
)
yield python_obj_to_pickle_file_bytes(dsacm)
@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
# read the file in memory, treat it as pickle file, and unpickle it
import pickle
import io
content = await file.read()
with io.BytesIO(content) as f:
request_obj = pickle.load(f)
# process the request_obj
return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")
@app.post("/search")
async def stream_response(file: UploadFile = File(...)):
# read the file in memory, treat it as pickle file, and unpickle it
import pickle
import io
content = await file.read()
with io.BytesIO(content) as f:
request_obj = pickle.load(f)
# process the request_obj
keyword = request_obj.client_command
from experimental_mods.get_search_kw_api_stop import search_videos
# Default parameters for video search
csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8' # Using default from main()
search_type = 'default'
max_pages = 1
output_path = 'search_results'
config_path = 'experimental_mods/config.json'
# Search for videos and return the first result
videos = search_videos(
keyword=keyword,
csrf_token=csrf_token,
search_type=search_type,
max_pages=max_pages,
output_path=output_path,
config_path=config_path,
early_stop=True
)
return StreamingResponse(simple_generator(videos), media_type="application/octet-stream")
@app.get("/")
async def hi():
return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \
"您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \
"此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\""
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=49000)