""" DaaS (Docker as a Service) is a service that allows users to run docker commands on the server side. """ from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi import FastAPI, File, UploadFile, HTTPException from pydantic import BaseModel, Field from typing import Optional, Dict import time import os import asyncio import subprocess import uuid import glob import threading import queue from shared_utils.docker_as_service_api import DockerServiceApiComModel app = FastAPI() def python_obj_to_pickle_file_bytes(obj): import pickle import io with io.BytesIO() as f: pickle.dump(obj, f) return f.getvalue() def yield_message(message): dsacm = DockerServiceApiComModel(server_message=message) return python_obj_to_pickle_file_bytes(dsacm) def read_output(stream, output_queue): while True: line_stdout = stream.readline() # print('recv') if line_stdout: output_queue.put(line_stdout) else: break async def stream_generator(request_obj): import tempfile # Create a temporary directory print('create temp dir') with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: # Construct the docker command download_folder = temp_dir # Get list of existing files before download existing_file_before_download = [] video_id = request_obj.client_command cmd = [ '/root/.dotnet/tools/BBDown', video_id, '--use-app-api', '--work-dir', f'{os.path.abspath(temp_dir)}' ] cmd = ' '.join(cmd) yield yield_message(cmd) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) stdout_queue = queue.Queue() thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) thread.daemon = True thread.start() stderr_queue = queue.Queue() thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) thread.daemon = True thread.start() while True: print("looping") # Check if there is any output in the queue stdout_this_round = "" stderr_this_round = "" while True: try: output_stdout = stdout_queue.get_nowait() # Non-blocking get if output_stdout: stdout_this_round += output_stdout print(output_stdout) except queue.Empty: yield yield_message(stdout_this_round) break while True: try: output_stderr = stderr_queue.get_nowait() # Non-blocking get if output_stderr: stderr_this_round += output_stderr print(output_stderr) except queue.Empty: yield yield_message(stderr_this_round) break # Break the loop if the process has finished if process.poll() is not None: break await asyncio.sleep(0.5) print("(daas return) ") # Get the return code return_code = process.returncode yield yield_message("(daas return code:) " + str(return_code)) print("(daas return code:) " + str(return_code)) # print(f"Successfully downloaded video {video_id}") existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*')) print("downloaded_files") print(existing_file_after_download) # existing_file_after_download = list(os.listdir(download_folder)) # get the difference downloaded_files = [ f for f in existing_file_after_download if f not in existing_file_before_download ] downloaded_files_path = [ os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download ] # read file server_file_attach = {} for fp, fn in zip(downloaded_files_path, downloaded_files): if os.path.isdir(fp): continue with open(fp, "rb") as f: file_bytes = f.read() server_file_attach[fn] = file_bytes print("downloaded_files") print(downloaded_files) dsacm = DockerServiceApiComModel( server_message="complete", server_file_attach=server_file_attach, ) print("sending files") yield python_obj_to_pickle_file_bytes(dsacm) def simple_generator(return_obj): dsacm = DockerServiceApiComModel( server_message=return_obj, ) yield python_obj_to_pickle_file_bytes(dsacm) @app.post("/stream") async def stream_response(file: UploadFile = File(...)): # read the file in memory, treat it as pickle file, and unpickle it import pickle import io content = await file.read() with io.BytesIO(content) as f: request_obj = pickle.load(f) # process the request_obj return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream") @app.post("/search") async def stream_response(file: UploadFile = File(...)): # read the file in memory, treat it as pickle file, and unpickle it import pickle import io content = await file.read() with io.BytesIO(content) as f: request_obj = pickle.load(f) # process the request_obj keyword = request_obj.client_command from experimental_mods.get_search_kw_api_stop import search_videos # Default parameters for video search csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8' # Using default from main() search_type = 'default' max_pages = 1 output_path = 'search_results' config_path = 'experimental_mods/config.json' # Search for videos and return the first result videos = search_videos( keyword=keyword, csrf_token=csrf_token, search_type=search_type, max_pages=max_pages, output_path=output_path, config_path=config_path, early_stop=True ) return StreamingResponse(simple_generator(videos), media_type="application/octet-stream") @app.get("/") async def hi(): return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \ "您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \ "此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\"" if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=49000)