HarmonyAI / app.py
mousamax's picture
deploy
b8b10fd
from operator import itemgetter
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import StrOutputParser
from langchain.schema.runnable import Runnable, RunnablePassthrough, RunnableLambda
from langchain.schema.runnable.config import RunnableConfig
from langchain.memory import ConversationBufferMemory
from resolution_logic import ResolutionLogic
from literal_thread_manager import LiteralThreadManager
from prompt_engineering.prompt_desing import system_prompt, system_prompt_b, system_prompt_questioning
import chainlit as cl
from chainlit.types import ThreadDict
import os
from dotenv import load_dotenv
# Load the environment variables from the .env file
load_dotenv()
jwt_secret_key = os.getenv('CHAINLIT_AUTH_SECRET')
if not jwt_secret_key:
raise ValueError(
"You must provide a JWT secret in the environment to use authentication.")
# Get the value of the OPENAI_API_KEY from the environment variables
openai_api_key = os.getenv("OPENAI_API_KEY")
# Set the OPENAI_API_KEY in the environment
os.environ["OPENAI_API_KEY"] = openai_api_key
manager = LiteralThreadManager(api_key=os.getenv("LITERAL_API_KEY"))
def setup_runnable():
"""
Sets up the runnable pipeline for the chatbot. This pipeline includes a model for generating responses
and memory management for maintaining conversation context.
"""
memory = cl.user_session.get("memory") # type: ConversationBufferMemory
model = ChatOpenAI(streaming=True, model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt_questioning),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
]
)
runnable = (
RunnablePassthrough.assign(
history=RunnableLambda(
memory.load_memory_variables) | itemgetter("history")
)
| prompt
| model
| StrOutputParser()
)
cl.user_session.set("runnable", runnable)
@cl.password_auth_callback
def auth_callback(username: str, password: str):
"""
Authenticates a user using the provided username and password. If the user does not exist in the
LiteralAI database, a new user is created.
Args:
username (str): The username provided by the user.
password (str): The password provided by the user.
Returns:
cl.User | None: A User object if authentication is successful, create a User otherwise.
"""
auth_user = manager.literal_client.api.get_or_create_user(identifier=username)
if auth_user:
if username != "admin":
return cl.User(
identifier=username, metadata={
"role": "user", "provider": "credentials"}
)
else:
return cl.User(
identifier=username, metadata={
"role": "admin", "provider": "credentials"}
)
else:
return None
def create_and_update_threads(first_res, current_user, partner_user):
"""
Creates and updates threads for the conversation between the current user and their partner.
Args:
first_res (str): The initial response from the user.
current_user (cl.User): The current user initiating the conversation.
partner_user (cl.User): The partner user to connect with.
"""
latest_thread = manager.literal_client.api.get_threads(first=1)
partner_thread = manager.literal_client.api.create_thread(name=first_res['output'], participant_id=partner_user.id, metadata={
"partner_id": current_user.id, "partner_thread_id": latest_thread.data[0].id, "user_id": partner_user.id})
resolver = ResolutionLogic()
message_to_other_partner = resolver.summarize_conflict_topic(partner_user.identifier, current_user.identifier, first_res['output'])
manager.literal_client.api.create_step(thread_id=partner_thread.id, type="assistant_message",
output={'content': message_to_other_partner})
current_thread = manager.literal_client.api.upsert_thread(id=latest_thread.data[0].id,
participant_id=current_user.id, metadata={"partner_id": partner_user.id, "partner_thread_id": partner_thread.id})
cl.user_session.set("thread_id", current_thread.id)
manager.get_other_partner_thread_id(current_thread.id)
@cl.action_callback("2-1 Chat")
async def on_action(action):
"""
Handles the action callback for initiating a 2-1 chat.
Args:
action (cl.Action): The action object containing the user's input.
"""
await cl.Message(content="Write the email and the chat id:").send()
action.get("value")
await action.remove()
@cl.on_chat_start
async def on_chat_start():
"""
Handles the start of a chat session. Initializes the memory, sets up the runnable pipeline, and prompts the user
to summarize the type of conflict.
"""
cl.user_session.set("memory", ConversationBufferMemory(return_messages=True))
setup_runnable()
first_res = await cl.AskUserMessage(content="Welcome to the Relationship Coach chatbot. I can help you with your relationship questions. Please first summarize the type of conflict.").send()
add_person = await cl.AskActionMessage(
content="Select the conversation type.",
actions=[
cl.Action(name="1-1 Chat", value="1-1 Chat", label="πŸ‘€ 1-1"),
cl.Action(name="2-1 Chat", value="2-1 Chat", label="πŸ‘₯ 2-1"),
],
).send()
if add_person and add_person.get("value") == "2-1 Chat":
res = await cl.AskUserMessage(content="Please write the username of the person to connect with.").send()
if res:
# request the parnet username until it exists in the db
while manager.literal_client.api.get_user(identifier=res["output"]) == None:
await cl.Message(content=f"Partner {res['output']} does not exist in db.").send()
res = await cl.AskUserMessage(content="Please write the username of the person to connect with.").send()
partner_username = res['output']
partner_user = manager.literal_client.api.get_user(identifier=partner_username)
current_user = cl.user_session.get("user")
current_username = current_user.identifier
manager.literal_client.api.update_user(id=current_user.id, identifier=current_username, metadata={
"role": "user", "provider": "credentials", "relationships": {"partner_username": partner_username}})
await cl.Message(content=f"Connected with {partner_username}!").send()
await on_message(cl.Message(content=first_res['output']))
create_and_update_threads(first_res, current_user, partner_user)
else:
await cl.Message(
content=f"Action timed out!",
).send()
@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
"""
Handles the resumption of a chat session. Restores the chat memory and sets up the runnable pipeline.
Args:
thread (ThreadDict): The thread dictionary containing the chat history.
"""
memory = ConversationBufferMemory(return_messages=True)
root_messages = [m for m in thread["steps"] if m["parentId"] == None]
for message in root_messages:
if message["type"] == "user_message":
memory.chat_memory.add_user_message(message["output"])
else:
memory.chat_memory.add_ai_message(message["output"])
cl.user_session.set("memory", memory)
cl.user_session.set("thread_id", thread["id"])
setup_runnable()
conflict_resolution = ResolutionLogic()
resolution = conflict_resolution.intervention(thread["id"])
if resolution:
await cl.Message(content=resolution).send()
@cl.on_message
async def on_message(message: cl.Message):
"""
Handles incoming messages during a chat session. Updates the memory and generates a response.
Args:
message (cl.Message): The incoming message from the user.
"""
memory = cl.user_session.get("memory") # type: ConversationBufferMemory
runnable = cl.user_session.get("runnable") # type: Runnable
response = cl.Message(content="")
conflict_resolution = ResolutionLogic()
if cl.user_session.get("thread_id"):
resolution = conflict_resolution.intervention(cl.user_session.get("thread_id"))
if cl.user_session.get("thread_id") and resolution:
response = cl.Message(content=resolution)
else:
async for chunk in runnable.astream(
{"question": message.content},
config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]),
):
await response.stream_token(chunk)
await response.send()
memory.chat_memory.add_user_message(message.content)
memory.chat_memory.add_ai_message(response.content)
def main():
"""
The main function to demonstrate the usage of the chatbot. Initializes the chat session and starts the event loop.
"""
on_chat_start()
cl.run()
if __name__ == "__main__":
main()