File size: 4,637 Bytes
312898d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
DaaS (Docker as a Service) is a service 
that allows users to run docker commands on the server side.
"""

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel

app = FastAPI()

def python_obj_to_pickle_file_bytes(obj):
    import pickle
    import io
    with io.BytesIO() as f:
        pickle.dump(obj, f)
        return f.getvalue()

def yield_message(message):
    dsacm = DockerServiceApiComModel(server_message=message)
    return python_obj_to_pickle_file_bytes(dsacm)

def read_output(stream, output_queue):
    while True:
        line_stdout = stream.readline()
        # print('recv')
        if line_stdout:
            output_queue.put(line_stdout)
        else:
            break


async def stream_generator(request_obj):
    import tempfile
    # Create a temporary directory
    with tempfile.TemporaryDirectory() as temp_dir:

        # Construct the docker command
        download_folder = temp_dir

        # Get list of existing files before download
        existing_file_before_download = []

        video_id = request_obj.client_command
        cmd = [
            '/root/.dotnet/tools/BBDown',
            video_id,
            '--use-app-api',
            '--work-dir', 
            f'{os.path.abspath(temp_dir)}'
        ]
        cmd = ' '.join(cmd)
        yield yield_message(cmd)
        process = subprocess.Popen(cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=True,
                    text=True)

        stdout_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
        thread.daemon = True
        thread.start()
        stderr_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
        thread.daemon = True
        thread.start()

        while True:
            print("looping")
            # Check if there is any output in the queue
            try:
                output_stdout = stdout_queue.get_nowait()  # Non-blocking get
                if output_stdout:
                    print(output_stdout)
                    yield yield_message(output_stdout)

                output_stderr = stderr_queue.get_nowait()  # Non-blocking get
                if output_stderr:
                    print(output_stdout)
                    yield yield_message(output_stderr)
            except queue.Empty:
                pass  # No output available

            # Break the loop if the process has finished
            if process.poll() is not None:
                break
            
            await asyncio.sleep(0.25)

        # Get the return code
        return_code = process.returncode
        yield yield_message("(return code:) " + str(return_code))

        # print(f"Successfully downloaded video {video_id}")
        existing_file_after_download = list(os.listdir(download_folder))
        # get the difference
        downloaded_files = [
            f for f in existing_file_after_download if f not in existing_file_before_download
        ]
        downloaded_files_path = [
            os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
        ]
        # read file 
        server_file_attach = {}
        for fp, fn in zip(downloaded_files_path, downloaded_files):
            with open(fp, "rb") as f:
                file_bytes = f.read()
                server_file_attach[fn] = file_bytes

        dsacm = DockerServiceApiComModel(
            server_message="complete",
            server_file_attach=server_file_attach,
        )
        yield python_obj_to_pickle_file_bytes(dsacm)


@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
    # read the file in memory, treat it as pickle file, and unpickle it
    import pickle
    import io
    content = await file.read()
    with io.BytesIO(content) as f:
        request_obj = pickle.load(f)
    # process the request_obj
    return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")

@app.get("/")
async def hi():
    return "Hello, this is Docker as a Service (DaaS)!"
    
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=49000)