ev-assistant / app.py
ofermend's picture
NA
72518a3
raw
history blame
6.88 kB
from PIL import Image
import sys
import os
import streamlit as st
from streamlit_pills import pills
import sqlite3
import pandas as pd
from datasets import load_dataset
from vectara_agent.agent import AgentStatusType
from agent import initialize_agent, get_agent_config
initial_prompt = "How can I help you today?"
def toggle_logs():
st.session_state.show_logs = not st.session_state.show_logs
def show_example_questions():
if len(st.session_state.example_messages) > 0 and st.session_state.first_turn:
selected_example = pills("Queries to Try:", st.session_state.example_messages, index=None)
if selected_example:
st.session_state.ex_prompt = selected_example
st.session_state.first_turn = False
return True
return False
def update_func(status_type: AgentStatusType, msg: str):
if status_type != AgentStatusType.AGENT_UPDATE:
output = f"{status_type.value} - {msg}"
st.session_state.log_messages.append(output)
def launch_bot():
def reset():
st.session_state.messages = [{"role": "assistant", "content": initial_prompt, "avatar": "πŸ¦–"}]
st.session_state.thinking_message = "Agent at work..."
st.session_state.log_messages = []
st.session_state.prompt = None
st.session_state.ex_prompt = None
st.session_state.first_turn = True
st.session_state.show_logs = False
if 'agent' not in st.session_state:
st.session_state.agent = initialize_agent(cfg, update_func=update_func)
if 'cfg' not in st.session_state:
cfg = get_agent_config()
st.session_state.cfg = cfg
st.session_state.ex_prompt = None
example_messages = [example.strip() for example in cfg.examples.split(",")] if cfg.examples else []
st.session_state.example_messages = [em for em in example_messages if len(em)>0]
reset()
cfg = st.session_state.cfg
# left side content
with st.sidebar:
image = Image.open('Vectara-logo.png')
st.image(image, width=175)
st.markdown(f"## {cfg['demo_welcome']}")
st.markdown(f"{cfg['demo_description']}")
st.markdown("\n\n")
bc1, _ = st.columns([1, 1])
with bc1:
if st.button('Start Over'):
reset()
st.rerun()
st.markdown("---")
st.markdown(
"## How this works?\n"
"This app was built with [Vectara](https://vectara.com).\n\n"
"It demonstrates the use of Agentic RAG functionality with Vectara"
)
if "messages" not in st.session_state.keys():
reset()
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"], avatar=message["avatar"]):
st.write(message["content"])
example_container = st.empty()
with example_container:
if show_example_questions():
example_container.empty()
st.session_state.first_turn = False
st.rerun()
# User-provided prompt
if st.session_state.ex_prompt:
prompt = st.session_state.ex_prompt
else:
prompt = st.chat_input()
if prompt:
st.session_state.messages.append({"role": "user", "content": prompt, "avatar": 'πŸ§‘β€πŸ’»'})
st.session_state.prompt = prompt # Save the prompt in session state
st.session_state.log_messages = []
st.session_state.show_logs = False
with st.chat_message("user", avatar='πŸ§‘β€πŸ’»'):
print(f"Starting new question: {prompt}\n")
st.write(prompt)
st.session_state.ex_prompt = None
# Generate a new response if last message is not from assistant
if st.session_state.prompt:
with st.chat_message("assistant", avatar='πŸ€–'):
with st.spinner(st.session_state.thinking_message):
res = st.session_state.agent.chat(st.session_state.prompt)
res = res.replace('$', '\\$') # escape dollar sign for markdown
message = {"role": "assistant", "content": res, "avatar": 'πŸ€–'}
st.session_state.messages.append(message)
st.markdown(res)
st.session_state.ex_prompt = None
st.session_state.prompt = None
st.session_state.first_turn = False
st.rerun()
log_placeholder = st.empty()
with log_placeholder.container():
if st.session_state.show_logs:
st.button("Hide Logs", on_click=toggle_logs)
for msg in st.session_state.log_messages:
st.text(msg)
else:
if len(st.session_state.log_messages) > 0:
st.button("Show Logs", on_click=toggle_logs)
sys.stdout.flush()
def setup_db():
db_path = 'ev_database.db'
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
with st.spinner("Loading data... Please wait..."):
def tables_populated() -> bool:
tables = ['ev_population', 'county_registrations', 'ev_registrations']
for table in tables:
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'")
result = cursor.fetchone()
if not result:
return False
return True
if tables_populated():
print("Database tables already populated, skipping setup")
conn.close()
return
else:
print("Populating database tables")
# Execute the SQL commands to create tables
with open('create_tables.sql', 'r') as sql_file:
sql_script = sql_file.read()
cursor.executescript(sql_script)
hf_token = os.getenv('HF_TOKEN')
# Load data into ev_population table
df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Population_Data.csv", token=hf_token)['train'].to_pandas()
df.to_sql('ev_population', conn, if_exists='replace', index=False)
# Load data into county_registrations table
df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Population_Size_History_By_County.csv", token=hf_token)['train'].to_pandas()
df.to_sql('county_registrations', conn, if_exists='replace', index=False)
# Load data into ev_registrations table
df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Title_and_Registration_Activity.csv", token=hf_token)['train'].to_pandas()
df.to_sql('ev_registrations', conn, if_exists='replace', index=False)
# Commit changes and close connection
conn.commit()
conn.close()
if __name__ == "__main__":
st.set_page_config(page_title="Electric Vehicles Assistant", layout="wide")
setup_db()
launch_bot()