File size: 10,255 Bytes
444f09e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from collections.abc import Callable, Iterable, Mapping
from typing import Any
from toolbox import CatchException, update_ui, gen_time_str, trimmed_format_exc
from toolbox import promote_file_to_downloadzone, get_log_folder
from .crazy_utils import request_gpt_model_in_new_thread_with_ui_alive
from .crazy_utils import input_clipping, try_install_deps
from multiprocessing import Process, Pipe
import os
import time

templete = """
```python
import ...  # Put dependencies here, e.g. import numpy as np

class TerminalFunction(object): # Do not change the name of the class, The name of the class must be `TerminalFunction`

    def run(self, path):    # The name of the function must be `run`, it takes only a positional argument.
        # rewrite the function you have just written here 
        ...
        return generated_file_path
```
"""

def inspect_dependency(chatbot, history):
    yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
    return True

def get_code_block(reply):
    import re
    pattern = r"```([\s\S]*?)```" # regex pattern to match code blocks
    matches = re.findall(pattern, reply) # find all code blocks in text
    if len(matches) == 1: 
        return matches[0].strip('python') #  code block
    for match in matches:
        if 'class TerminalFunction' in match:
            return match.strip('python') #  code block
    raise RuntimeError("GPT is not generating proper code.")

def gpt_interact_multi_step(txt, file_type, llm_kwargs, chatbot, history):
    # 输入
    prompt_compose = [
        f'Your job:\n'
        f'1. write a single Python function, which takes a path of a `{file_type}` file as the only argument and returns a `string` containing the result of analysis or the path of generated files. \n',
        f"2. You should write this function to perform following task: " + txt + "\n",
        f"3. Wrap the output python function with markdown codeblock."
    ]
    i_say = "".join(prompt_compose)
    demo = []

    # 第一步
    gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
        inputs=i_say, inputs_show_user=i_say, 
        llm_kwargs=llm_kwargs, chatbot=chatbot, history=demo, 
        sys_prompt= r"You are a programmer."
    )
    history.extend([i_say, gpt_say])
    yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新

    # 第二步
    prompt_compose = [
        "If previous stage is successful, rewrite the function you have just written to satisfy following templete: \n",
        templete
    ]
    i_say = "".join(prompt_compose); inputs_show_user = "If previous stage is successful, rewrite the function you have just written to satisfy executable templete. "
    gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
        inputs=i_say, inputs_show_user=inputs_show_user, 
        llm_kwargs=llm_kwargs, chatbot=chatbot, history=history, 
        sys_prompt= r"You are a programmer."
    )
    code_to_return = gpt_say
    history.extend([i_say, gpt_say])
    yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新
    
    # # 第三步
    # i_say = "Please list to packages to install to run the code above. Then show me how to use `try_install_deps` function to install them."
    # i_say += 'For instance. `try_install_deps(["opencv-python", "scipy", "numpy"])`'
    # installation_advance = yield from request_gpt_model_in_new_thread_with_ui_alive(
    #     inputs=i_say, inputs_show_user=inputs_show_user, 
    #     llm_kwargs=llm_kwargs, chatbot=chatbot, history=history, 
    #     sys_prompt= r"You are a programmer."
    # )
    # # # 第三步  
    # i_say = "Show me how to use `pip` to install packages to run the code above. "
    # i_say += 'For instance. `pip install -r opencv-python scipy numpy`'
    # installation_advance = yield from request_gpt_model_in_new_thread_with_ui_alive(
    #     inputs=i_say, inputs_show_user=i_say, 
    #     llm_kwargs=llm_kwargs, chatbot=chatbot, history=history, 
    #     sys_prompt= r"You are a programmer."
    # )
    installation_advance = ""
    
    return code_to_return, installation_advance, txt, file_type, llm_kwargs, chatbot, history

def make_module(code):
    module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
    with open(f'{get_log_folder()}/{module_file}.py', 'w', encoding='utf8') as f:
        f.write(code)

    def get_class_name(class_string):
        import re
        # Use regex to extract the class name
        class_name = re.search(r'class (\w+)\(', class_string).group(1)
        return class_name

    class_name = get_class_name(code)
    return f"{get_log_folder().replace('/', '.')}.{module_file}->{class_name}"

def init_module_instance(module):
    import importlib
    module_, class_ = module.split('->')
    init_f = getattr(importlib.import_module(module_), class_)
    return init_f()

def for_immediate_show_off_when_possible(file_type, fp, chatbot):
    if file_type in ['png', 'jpg']:
        image_path = os.path.abspath(fp)
        chatbot.append(['这是一张图片, 展示如下:',  
            f'本地文件地址: <br/>`{image_path}`<br/>'+
            f'本地文件预览: <br/><div align="center"><img src="file={image_path}"></div>'
        ])
    return chatbot

def subprocess_worker(instance, file_path, return_dict):
    return_dict['result'] = instance.run(file_path)

def have_any_recent_upload_files(chatbot):
    _5min = 5 * 60
    if not chatbot: return False    # chatbot is None
    most_recent_uploaded = chatbot._cookies.get("most_recent_uploaded", None)
    if not most_recent_uploaded: return False   # most_recent_uploaded is None
    if time.time() - most_recent_uploaded["time"] < _5min: return True # most_recent_uploaded is new
    else: return False  # most_recent_uploaded is too old

def get_recent_file_prompt_support(chatbot):
    most_recent_uploaded = chatbot._cookies.get("most_recent_uploaded", None)
    path = most_recent_uploaded['path']
    return path

@CatchException
def 虚空终端CodeInterpreter(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port):
    """
    txt             输入栏用户输入的文本,例如需要翻译的一段话,再例如一个包含了待处理文件的路径
    llm_kwargs      gpt模型参数,如温度和top_p等,一般原样传递下去就行
    plugin_kwargs   插件模型的参数,暂时没有用武之地
    chatbot         聊天显示框的句柄,用于显示给用户
    history         聊天历史,前情提要
    system_prompt   给gpt的静默提醒
    web_port        当前软件运行的端口号
    """
    raise NotImplementedError

    # 清空历史,以免输入溢出
    history = []; clear_file_downloadzone(chatbot)

    # 基本信息:功能、贡献者
    chatbot.append([
        "函数插件功能?",
        "CodeInterpreter开源版, 此插件处于开发阶段, 建议暂时不要使用, 插件初始化中 ..."
    ])
    yield from update_ui(chatbot=chatbot, history=history) # 刷新界面

    if have_any_recent_upload_files(chatbot):
        file_path = get_recent_file_prompt_support(chatbot)
    else:
        chatbot.append(["文件检索", "没有发现任何近期上传的文件。"])
        yield from update_ui(chatbot=chatbot, history=history) # 刷新界面

    # 读取文件
    if ("recently_uploaded_files" in plugin_kwargs) and (plugin_kwargs["recently_uploaded_files"] == ""): plugin_kwargs.pop("recently_uploaded_files")
    recently_uploaded_files = plugin_kwargs.get("recently_uploaded_files", None)
    file_path = recently_uploaded_files[-1]
    file_type = file_path.split('.')[-1]

    # 粗心检查
    if is_the_upload_folder(txt):
        chatbot.append([
            "...",
            f"请在输入框内填写需求,然后再次点击该插件(文件路径 {file_path} 已经被记忆)"
        ])
        yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
        return
    
    # 开始干正事
    for j in range(5):  # 最多重试5次
        try:
            code, installation_advance, txt, file_type, llm_kwargs, chatbot, history = \
                yield from gpt_interact_multi_step(txt, file_type, llm_kwargs, chatbot, history)
            code = get_code_block(code)
            res = make_module(code)
            instance = init_module_instance(res)
            break
        except Exception as e:
            chatbot.append([f"第{j}次代码生成尝试,失败了", f"错误追踪\n```\n{trimmed_format_exc()}\n```\n"])
            yield from update_ui(chatbot=chatbot, history=history) # 刷新界面

    # 代码生成结束, 开始执行
    try:
        import multiprocessing
        manager = multiprocessing.Manager()
        return_dict = manager.dict()

        p = multiprocessing.Process(target=subprocess_worker, args=(instance, file_path, return_dict))
        # only has 10 seconds to run
        p.start(); p.join(timeout=10)
        if p.is_alive(): p.terminate(); p.join()
        p.close()
        res = return_dict['result']
        # res = instance.run(file_path)
    except Exception as e:
        chatbot.append(["执行失败了", f"错误追踪\n```\n{trimmed_format_exc()}\n```\n"])
        # chatbot.append(["如果是缺乏依赖,请参考以下建议", installation_advance])
        yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
        return

    # 顺利完成,收尾
    res = str(res)
    if os.path.exists(res):
        chatbot.append(["执行成功了,结果是一个有效文件", "结果:" + res])
        new_file_path = promote_file_to_downloadzone(res, chatbot=chatbot)
        chatbot = for_immediate_show_off_when_possible(file_type, new_file_path, chatbot)
        yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新
    else:
        chatbot.append(["执行成功了,结果是一个字符串", "结果:" + res])
        yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新   

"""
测试:
    裁剪图像,保留下半部分
    交换图像的蓝色通道和红色通道
    将图像转为灰度图像
    将csv文件转excel表格
"""