File size: 6,383 Bytes
312898d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e788445
 
 
 
 
 
 
 
da6e788
 
312898d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19aa87f
312898d
 
 
 
 
 
 
 
 
 
 
da6e788
 
 
 
 
 
312898d
 
 
 
 
 
 
 
 
 
 
da6e788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
312898d
 
19aa87f
 
 
da6e788
312898d
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
DaaS (Docker as a Service) is a service 
that allows users to run docker commands on the server side.
"""

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel

app = FastAPI()

def python_obj_to_pickle_file_bytes(obj):
    import pickle
    import io
    with io.BytesIO() as f:
        pickle.dump(obj, f)
        return f.getvalue()

def yield_message(message):
    dsacm = DockerServiceApiComModel(server_message=message)
    return python_obj_to_pickle_file_bytes(dsacm)

def read_output(stream, output_queue):
    while True:
        line_stdout = stream.readline()
        # print('recv')
        if line_stdout:
            output_queue.put(line_stdout)
        else:
            break


async def stream_generator(request_obj):
    import tempfile
    # Create a temporary directory
    with tempfile.TemporaryDirectory() as temp_dir:

        # Construct the docker command
        download_folder = temp_dir

        # Get list of existing files before download
        existing_file_before_download = []

        video_id = request_obj.client_command
        cmd = [
            '/root/.dotnet/tools/BBDown',
            video_id,
            '--use-app-api',
            '--work-dir', 
            f'{os.path.abspath(temp_dir)}'
        ]


        cmd = ' '.join(cmd)
        yield yield_message(cmd)
        process = subprocess.Popen(cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=True,
                    text=True)

        stdout_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
        thread.daemon = True
        thread.start()
        stderr_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
        thread.daemon = True
        thread.start()

        while True:
            print("looping")
            # Check if there is any output in the queue
            try:
                output_stdout = stdout_queue.get_nowait()  # Non-blocking get
                if output_stdout:
                    print(output_stdout)
                    yield yield_message(output_stdout)

                output_stderr = stderr_queue.get_nowait()  # Non-blocking get
                if output_stderr:
                    print(output_stdout)
                    yield yield_message(output_stderr)
            except queue.Empty:
                pass  # No output available

            # Break the loop if the process has finished
            if process.poll() is not None:
                break
            
            await asyncio.sleep(0.25)

        # Get the return code
        return_code = process.returncode
        yield yield_message("(return code:) " + str(return_code))

        # print(f"Successfully downloaded video {video_id}")
        existing_file_after_download = list(os.listdir(download_folder))
        # get the difference
        downloaded_files = [
            f for f in existing_file_after_download if f not in existing_file_before_download
        ]
        downloaded_files_path = [
            os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
        ]
        # read file 
        server_file_attach = {}
        for fp, fn in zip(downloaded_files_path, downloaded_files):
            if os.path.isdir(fp): continue
            with open(fp, "rb") as f:
                file_bytes = f.read()
                server_file_attach[fn] = file_bytes

        dsacm = DockerServiceApiComModel(
            server_message="complete",
            server_file_attach=server_file_attach,
        )
        yield python_obj_to_pickle_file_bytes(dsacm)


def simple_generator(return_obj):
    dsacm = DockerServiceApiComModel(
        server_message=return_obj,
    )
    yield python_obj_to_pickle_file_bytes(dsacm)

@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
    # read the file in memory, treat it as pickle file, and unpickle it
    import pickle
    import io
    content = await file.read()
    with io.BytesIO(content) as f:
        request_obj = pickle.load(f)
    # process the request_obj
    return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")

@app.post("/search")
async def stream_response(file: UploadFile = File(...)):
    # read the file in memory, treat it as pickle file, and unpickle it
    import pickle
    import io
    content = await file.read()
    with io.BytesIO(content) as f:
        request_obj = pickle.load(f)

    # process the request_obj
    keyword = request_obj.client_command

    from experimental_mods.get_search_kw_api_stop import search_videos
    # Default parameters for video search
    csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8'  # Using default from main()
    search_type = 'default'
    max_pages = 1
    output_path = 'search_results'
    config_path = 'experimental_mods/config.json'

    # Search for videos and return the first result
    videos = search_videos(
        keyword=keyword,
        csrf_token=csrf_token,
        search_type=search_type,
        max_pages=max_pages,
        output_path=output_path,
        config_path=config_path,
        early_stop=True
    )

    return StreamingResponse(simple_generator(videos), media_type="application/octet-stream")

@app.get("/")
async def hi():
    return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \
           "您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \
           "此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\""

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=49000)