File size: 5,512 Bytes
78d94c0 51a8cda 78d94c0 51a8cda 78d94c0 06c67ff 78d94c0 06c67ff 78d94c0 17617e4 78d94c0 ac01374 78d94c0 ac01374 78d94c0 17617e4 78d94c0 ac01374 78d94c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# encoding:utf-8
from bot.bot import Bot
from config import conf
from common.log import logger
import openai
import time
user_session = dict()
# OpenAI对话模型API (可用)
class ChatGPTBot(Bot):
def __init__(self):
openai.api_key = conf().get('open_ai_api_key')
openai.api_base="https://zen-ai.top/v1"
def reply(self, query, context=None):
# acquire reply content
if not context or not context.get('type') or context.get('type') == 'TEXT':
logger.info("[OPEN_AI] query={}".format(query))
from_user_id = context['from_user_id']
if query == '#清除记忆':
Session.clear_session(from_user_id)
return '记忆已清除'
new_query = Session.build_session_query(query, from_user_id)
logger.debug("[OPEN_AI] session query={}".format(new_query))
# if context.get('stream'):
# # reply in stream
# return self.reply_text_stream(query, new_query, from_user_id)
reply_content = self.reply_text(new_query, from_user_id, 0)
logger.debug("[OPEN_AI] new_query={}, user={}, reply_cont={}".format(new_query, from_user_id, reply_content))
if reply_content:
Session.save_session(query, reply_content, from_user_id)
return reply_content
elif context.get('type', None) == 'IMAGE_CREATE':
return self.create_img(query, 0)
def reply_text(self, query, user_id, retry_count=0):
try:
response = openai.ChatCompletion.create(
model="gpt-4-1106-preview", # 对话模型的名称
messages=query,
temperature=0.5, # 值在[0,1]之间,越大表示回复越具有不确定性
max_tokens=1500, # 回复最大的字符数
top_p=1,
frequency_penalty=0.5, # [-2,2]之间,该值越大则更倾向于产生不同的内容
presence_penalty=0.5, # [-2,2]之间,该值越大则更倾向于产生不同的内容
)
# res_content = response.choices[0]['text'].strip().replace('<|endoftext|>', '')
logger.info(response.choices[0]['message']['content'])
# log.info("[OPEN_AI] reply={}".format(res_content))
return response.choices[0]['message']['content']
except openai.error.RateLimitError as e:
# rate limit exception
logger.warn(e)
if retry_count < 1:
time.sleep(5)
logger.warn("[OPEN_AI] RateLimit exceed, 第{}次重试".format(retry_count+1))
return self.reply_text(query, user_id, retry_count+1)
else:
return "慢点,我先让时间停止一下,稍等"
except Exception as e:
# unknown exception
logger.exception(e)
Session.clear_session(user_id)
return "亲爱的,请说人话,我毕竟只是ZionLee配置的程序,不是万能的。"
def create_img(self, query, retry_count=0):
try:
logger.info("[OPEN_AI] image_query={}".format(query))
response = openai.Image.create(
prompt=query, #图片描述
n=1, #每次生成图片的数量
size="1024x1024" #图片大小,可选有 256x256, 512x512, 1024x1024
)
image_url = response['data'][0]['url']
logger.info("[OPEN_AI] image_url={}".format(image_url))
return image_url
except openai.error.RateLimitError as e:
logger.warn(e)
if retry_count < 1:
time.sleep(5)
logger.warn("[OPEN_AI] ImgCreate RateLimit exceed, 第{}次重试".format(retry_count+1))
return self.reply_text(query, retry_count+1)
else:
return "慢点,我先让时间停止一下,稍等。"
except Exception as e:
logger.exception(e)
return None
class Session(object):
@staticmethod
def build_session_query(query, user_id):
'''
build query with conversation history
e.g. [
{"role": "system", "content": "You are a helpful assistant,let's think step by step in multiple different ways."},
{"role": "user", "content": "Who won the world series in 2020?"},
{"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
{"role": "user", "content": "Where was it played?"}
]
:param query: query content
:param user_id: from user id
:return: query content with conversaction
'''
session = user_session.get(user_id, [])
if len(session) == 0:
system_prompt = conf().get("character_desc", "")
system_item = {'role': 'system', 'content': system_prompt}
session.append(system_item)
user_session[user_id] = session
user_item = {'role': 'user', 'content': query}
session.append(user_item)
return session
@staticmethod
def save_session(query, answer, user_id):
session = user_session.get(user_id)
if session:
# append conversation
gpt_item = {'role': 'assistant', 'content': answer}
session.append(gpt_item)
@staticmethod
def clear_session(user_id):
user_session[user_id] = []
|