import os import random import shutil import traceback import gradio as gr from config_utils import get_avatar_image, get_ci_dir, parse_configuration from gradio_utils import ChatBot, format_cover_html from modelscope_agent.utils.logger import agent_logger as logger from user_core import init_user_chatbot_agent uuid_str = 'local_user' builder_cfg, model_cfg, tool_cfg, available_tool_list, _, _ = parse_configuration( uuid_str) suggests = builder_cfg.get('prompt_recommend', []) avatar_pairs = get_avatar_image(builder_cfg.get('avatar', ''), uuid_str) customTheme = gr.themes.Default( primary_hue=gr.themes.utils.colors.blue, radius_size=gr.themes.utils.sizes.radius_none, ) def check_uuid(uuid_str): if not uuid_str or uuid_str == '': if os.getenv('MODELSCOPE_ENVIRONMENT') == 'studio': raise gr.Error('请登陆后使用! (Please login first)') else: uuid_str = 'local_user' return uuid_str def init_user(state): try: seed = state.get('session_seed', random.randint(0, 1000000000)) user_agent = init_user_chatbot_agent(uuid_str) user_agent.seed = seed state['user_agent'] = user_agent except Exception as e: logger.error( uuid=uuid_str, error=str(e), content={'error_traceback': traceback.format_exc()}) return state # 创建 Gradio 界面 demo = gr.Blocks(css='assets/appBot.css', theme=customTheme) with demo: gr.Markdown( '#
\N{fire} AgentFabric powered by Modelscope-agent ([github star](https://github.com/modelscope/modelscope-agent/tree/main))
' # noqa E501 ) draw_seed = random.randint(0, 1000000000) state = gr.State({'session_seed': draw_seed}) with gr.Row(elem_classes='container'): with gr.Column(scale=4): with gr.Column(): # Preview user_chatbot = ChatBot( value=[[None, '尝试问我一点什么吧~']], elem_id='user_chatbot', elem_classes=['markdown-body'], avatar_images=avatar_pairs, height=600, latex_delimiters=[], show_label=False) with gr.Row(): with gr.Column(scale=12): preview_chat_input = gr.Textbox( show_label=False, container=False, placeholder='跟我聊聊吧~') with gr.Column(min_width=70, scale=1): upload_button = gr.UploadButton( '上传', file_types=['file', 'image', 'audio', 'video', 'text'], file_count='multiple') with gr.Column(min_width=70, scale=1): preview_send_button = gr.Button('发送', variant='primary') with gr.Column(scale=1): user_chat_bot_cover = gr.HTML( format_cover_html(builder_cfg, avatar_pairs[1])) user_chat_bot_suggest = gr.Examples( label='Prompt Suggestions', examples=suggests, inputs=[preview_chat_input]) def upload_file(chatbot, upload_button, _state): _uuid_str = check_uuid(uuid_str) new_file_paths = [] if 'file_paths' in _state: file_paths = _state['file_paths'] else: file_paths = [] for file in upload_button: file_name = os.path.basename(file.name) # covert xxx.json to xxx_uuid_str.json file_name = file_name.replace('.', f'_{_uuid_str}.') file_path = os.path.join(get_ci_dir(), file_name) if not os.path.exists(file_path): # make sure file path's directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) shutil.copy(file.name, file_path) file_paths.append(file_path) new_file_paths.append(file_path) if file_name.endswith(('.jpeg', '.png', '.jpg')): chatbot += [((file_path, ), None)] else: chatbot.append((None, f'上传文件{file_name},成功')) yield { user_chatbot: gr.Chatbot.update(visible=True, value=chatbot), preview_chat_input: gr.Textbox.update(value='') } _state['file_paths'] = file_paths _state['new_file_paths'] = new_file_paths upload_button.upload( upload_file, inputs=[user_chatbot, upload_button, state], outputs=[user_chatbot, preview_chat_input]) def send_message(chatbot, input, _state): # 将发送的消息添加到聊天历史 user_agent = _state['user_agent'] if 'new_file_paths' in _state: new_file_paths = _state['new_file_paths'] else: new_file_paths = [] _state['new_file_paths'] = [] chatbot.append((input, '')) yield { user_chatbot: chatbot, preview_chat_input: gr.Textbox.update(value=''), } response = '' try: for frame in user_agent.stream_run( input, print_info=True, remote=False, append_files=new_file_paths): # is_final = frame.get("frame_is_final") llm_result = frame.get('llm_text', '') exec_result = frame.get('exec_result', '') # llm_result = llm_result.split("<|user|>")[0].strip() if len(exec_result) != 0: # action_exec_result if isinstance(exec_result, dict): exec_result = str(exec_result['result']) frame_text = f'{exec_result}' else: # llm result frame_text = llm_result # important! do not change this response += frame_text chatbot[-1] = (input, response) yield { user_chatbot: chatbot, } except Exception as e: if 'dashscope.common.error.AuthenticationError' in str(e): msg = 'DASHSCOPE_API_KEY should be set via environment variable. You can acquire this in ' \ 'https://help.aliyun.com/zh/dashscope/developer-reference/activate-dashscope-and-create-an-api-key' elif 'rate limit' in str(e): msg = 'Too many people are calling, please try again later.' else: msg = str(e) chatbot[-1] = (input, msg) yield {user_chatbot: chatbot} preview_send_button.click( send_message, inputs=[user_chatbot, preview_chat_input, state], outputs=[user_chatbot, preview_chat_input]) demo.load(init_user, inputs=[state], outputs=[state]) demo.queue() demo.launch()