Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import json | |
import re | |
import requests | |
import streamlit as st | |
from lagent.agents import Agent | |
from lagent.prompts.parsers import PluginParser | |
from lagent.agents.stream import PLUGIN_CN, get_plugin_prompt | |
from lagent.schema import AgentMessage | |
from lagent.actions import ArxivSearch | |
from lagent.hooks import Hook | |
from lagent.llms import GPTAPI | |
YOUR_TOKEN_HERE = os.getenv("token") | |
if not YOUR_TOKEN_HERE: | |
raise EnvironmentError("未找到环境变量 'token',请设置后再运行程序。") | |
# Hook类,用于对消息添加前缀 | |
class PrefixedMessageHook(Hook): | |
def __init__(self, prefix, senders=None): | |
""" | |
初始化Hook | |
:param prefix: 消息前缀 | |
:param senders: 指定发送者列表 | |
""" | |
self.prefix = prefix | |
self.senders = senders or [] | |
def before_agent(self, agent, messages, session_id): | |
""" | |
在代理处理消息前修改消息内容 | |
:param agent: 当前代理 | |
:param messages: 消息列表 | |
:param session_id: 会话ID | |
""" | |
for message in messages: | |
if message.sender in self.senders: | |
message.content = self.prefix + message.content | |
class AsyncBlogger: | |
"""博客生成类,整合写作者和批评者。""" | |
def __init__(self, model_type, api_base, writer_prompt, critic_prompt, critic_prefix='', max_turn=2): | |
""" | |
初始化博客生成器 | |
:param model_type: 模型类型 | |
:param api_base: API 基地址 | |
:param writer_prompt: 写作者提示词 | |
:param critic_prompt: 批评者提示词 | |
:param critic_prefix: 批评消息前缀 | |
:param max_turn: 最大轮次 | |
""" | |
self.model_type = model_type | |
self.api_base = api_base | |
self.llm = GPTAPI( | |
model_type=model_type, | |
api_base=api_base, | |
key=YOUR_TOKEN_HERE, | |
max_new_tokens=4096, | |
) | |
self.plugins = [dict(type='lagent.actions.ArxivSearch')] | |
self.writer = Agent( | |
self.llm, | |
writer_prompt, | |
name='写作者', | |
output_format=dict( | |
type=PluginParser, | |
template=PLUGIN_CN, | |
prompt=get_plugin_prompt(self.plugins) | |
) | |
) | |
self.critic = Agent( | |
self.llm, | |
critic_prompt, | |
name='批评者', | |
hooks=[PrefixedMessageHook(critic_prefix, ['写作者'])] | |
) | |
self.max_turn = max_turn | |
async def forward(self, message: AgentMessage, update_placeholder): | |
""" | |
执行多阶段博客生成流程 | |
:param message: 初始消息 | |
:param update_placeholder: Streamlit占位符 | |
:return: 最终优化的博客内容 | |
""" | |
step1_placeholder = update_placeholder.container() | |
step2_placeholder = update_placeholder.container() | |
step3_placeholder = update_placeholder.container() | |
# 第一步:生成初始内容 | |
step1_placeholder.markdown("**Step 1: 生成初始内容...**") | |
message = self.writer(message) | |
if message.content: | |
step1_placeholder.markdown(f"**生成的初始内容**:\n\n{message.content}") | |
else: | |
step1_placeholder.markdown("**生成的初始内容为空,请检查生成逻辑。**") | |
# 第二步:批评者提供反馈 | |
step2_placeholder.markdown("**Step 2: 批评者正在提供反馈和文献推荐...**") | |
message = self.critic(message) | |
if message.content: | |
# 解析批评者反馈 | |
suggestions = re.search(r"1\. 批评建议:\n(.*?)2\. 推荐的关键词:", message.content, re.S) | |
keywords = re.search(r"2\. 推荐的关键词:\n- (.*)", message.content) | |
feedback = suggestions.group(1).strip() if suggestions else "未提供批评建议" | |
keywords = keywords.group(1).strip() if keywords else "未提供关键词" | |
# Arxiv 文献查询 | |
arxiv_search = ArxivSearch() | |
arxiv_results = arxiv_search.get_arxiv_article_information(keywords) | |
# 显示批评内容和文献推荐 | |
message.content = f"**批评建议**:\n{feedback}\n\n**推荐的文献**:\n{arxiv_results}" | |
step2_placeholder.markdown(f"**批评和文献推荐**:\n\n{message.content}") | |
else: | |
step2_placeholder.markdown("**批评内容为空,请检查批评逻辑。**") | |
# 第三步:写作者根据反馈优化内容 | |
step3_placeholder.markdown("**Step 3: 根据反馈改进内容...**") | |
improvement_prompt = AgentMessage( | |
sender="critic", | |
content=( | |
f"根据以下批评建议和推荐文献对内容进行改进:\n\n" | |
f"批评建议:\n{feedback}\n\n" | |
f"推荐文献:\n{arxiv_results}\n\n" | |
f"请优化初始内容,使其更加清晰、丰富,并符合专业水准。" | |
), | |
) | |
message = self.writer(improvement_prompt) | |
if message.content: | |
step3_placeholder.markdown(f"**最终优化的博客内容**:\n\n{message.content}") | |
else: | |
step3_placeholder.markdown("**最终优化的博客内容为空,请检查生成逻辑。**") | |
return message | |
def setup_sidebar(): | |
"""设置侧边栏,选择模型。""" | |
model_name = st.sidebar.text_input('模型名称:', value='internlm2.5-latest') | |
api_base = st.sidebar.text_input( | |
'API Base 地址:', value='https://internlm-chat.intern-ai.org.cn/puyu/api/v1/chat/completions' | |
) | |
return model_name, api_base | |
def main(): | |
""" | |
主函数:构建Streamlit界面并处理用户交互 | |
""" | |
st.set_page_config(layout='wide', page_title='Lagent Web Demo', page_icon='🤖') | |
st.title("多代理博客优化助手") | |
model_type, api_base = setup_sidebar() | |
topic = st.text_input('输入一个话题:', 'Self-Supervised Learning') | |
generate_button = st.button('生成博客内容') | |
if ( | |
'blogger' not in st.session_state or | |
st.session_state['model_type'] != model_type or | |
st.session_state['api_base'] != api_base | |
): | |
st.session_state['blogger'] = AsyncBlogger( | |
model_type=model_type, | |
api_base=api_base, | |
writer_prompt="你是一位优秀的AI内容写作者,请撰写一篇有吸引力且信息丰富的博客内容。", | |
critic_prompt=""" | |
作为一位严谨的批评者,请给出建设性的批评和改进建议,并基于相关主题使用已有的工具推荐一些参考文献,推荐的关键词应该是英语形式,简洁且切题。 | |
请按照以下格式提供反馈: | |
1. 批评建议: | |
- (具体建议) | |
2. 推荐的关键词: | |
- (关键词1, 关键词2, ...) | |
""", | |
critic_prefix="请批评以下内容,并提供改进建议:\n\n" | |
) | |
st.session_state['model_type'] = model_type | |
st.session_state['api_base'] = api_base | |
if generate_button: | |
update_placeholder = st.empty() | |
async def run_async_blogger(): | |
message = AgentMessage( | |
sender='user', | |
content=f"请撰写一篇关于{topic}的博客文章,要求表达专业,生动有趣,并且易于理解。" | |
) | |
result = await st.session_state['blogger'].forward(message, update_placeholder) | |
return result | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(run_async_blogger()) | |
if __name__ == '__main__': | |
main() |