from PIL import Image import sys import os import streamlit as st from streamlit_pills import pills import sqlite3 import pandas as pd from datasets import load_dataset from vectara_agent.agent import AgentStatusType from agent import initialize_agent, get_agent_config initial_prompt = "How can I help you today?" def toggle_logs(): st.session_state.show_logs = not st.session_state.show_logs def show_example_questions(): if len(st.session_state.example_messages) > 0 and st.session_state.first_turn: selected_example = pills("Queries to Try:", st.session_state.example_messages, index=None) if selected_example: st.session_state.ex_prompt = selected_example st.session_state.first_turn = False return True return False def update_func(status_type: AgentStatusType, msg: str): if status_type != AgentStatusType.AGENT_UPDATE: output = f"{status_type.value} - {msg}" st.session_state.log_messages.append(output) def launch_bot(): def reset(): st.session_state.messages = [{"role": "assistant", "content": initial_prompt, "avatar": "🦖"}] st.session_state.thinking_message = "Agent at work..." st.session_state.log_messages = [] st.session_state.prompt = None st.session_state.ex_prompt = None st.session_state.first_turn = True st.session_state.show_logs = False if 'agent' not in st.session_state: st.session_state.agent = initialize_agent(cfg, update_func=update_func) if 'cfg' not in st.session_state: cfg = get_agent_config() st.session_state.cfg = cfg st.session_state.ex_prompt = None example_messages = [example.strip() for example in cfg.examples.split(",")] if cfg.examples else [] st.session_state.example_messages = [em for em in example_messages if len(em)>0] reset() cfg = st.session_state.cfg # left side content with st.sidebar: image = Image.open('Vectara-logo.png') st.image(image, width=175) st.markdown(f"## {cfg['demo_welcome']}") st.markdown(f"{cfg['demo_description']}") st.markdown("\n\n") bc1, _ = st.columns([1, 1]) with bc1: if st.button('Start Over'): reset() st.rerun() st.markdown("---") st.markdown( "## How this works?\n" "This app was built with [Vectara](https://vectara.com).\n\n" "It demonstrates the use of Agentic RAG functionality with Vectara" ) if "messages" not in st.session_state.keys(): reset() # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"], avatar=message["avatar"]): st.write(message["content"]) example_container = st.empty() with example_container: if show_example_questions(): example_container.empty() st.session_state.first_turn = False st.rerun() # User-provided prompt if st.session_state.ex_prompt: prompt = st.session_state.ex_prompt else: prompt = st.chat_input() if prompt: st.session_state.messages.append({"role": "user", "content": prompt, "avatar": '🧑‍💻'}) st.session_state.prompt = prompt # Save the prompt in session state st.session_state.log_messages = [] st.session_state.show_logs = False with st.chat_message("user", avatar='🧑‍💻'): print(f"Starting new question: {prompt}\n") st.write(prompt) st.session_state.ex_prompt = None # Generate a new response if last message is not from assistant if st.session_state.prompt: with st.chat_message("assistant", avatar='🤖'): with st.spinner(st.session_state.thinking_message): res = st.session_state.agent.chat(st.session_state.prompt) res = res.replace('$', '\\$') # escape dollar sign for markdown message = {"role": "assistant", "content": res, "avatar": '🤖'} st.session_state.messages.append(message) st.markdown(res) st.session_state.ex_prompt = None st.session_state.prompt = None st.session_state.first_turn = False st.rerun() log_placeholder = st.empty() with log_placeholder.container(): if st.session_state.show_logs: st.button("Hide Logs", on_click=toggle_logs) for msg in st.session_state.log_messages: st.text(msg) else: if len(st.session_state.log_messages) > 0: st.button("Show Logs", on_click=toggle_logs) sys.stdout.flush() def setup_db(): db_path = 'ev_database.db' conn = sqlite3.connect(db_path) cursor = conn.cursor() with st.spinner("Loading data... Please wait..."): def tables_populated() -> bool: tables = ['ev_population', 'county_registrations', 'ev_registrations'] for table in tables: cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'") result = cursor.fetchone() if not result: return False return True if tables_populated(): print("Database tables already populated, skipping setup") conn.close() return else: print("Populating database tables") # Execute the SQL commands to create tables with open('create_tables.sql', 'r') as sql_file: sql_script = sql_file.read() cursor.executescript(sql_script) hf_token = os.getenv('HF_TOKEN') # Load data into ev_population table df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Population_Data.csv", token=hf_token)['train'].to_pandas() df.to_sql('ev_population', conn, if_exists='replace', index=False) # Load data into county_registrations table df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Population_Size_History_By_County.csv", token=hf_token)['train'].to_pandas() df.to_sql('county_registrations', conn, if_exists='replace', index=False) # Load data into ev_registrations table df = load_dataset("vectara/ev-dataset", data_files="Electric_Vehicle_Title_and_Registration_Activity.csv", token=hf_token)['train'].to_pandas() df.to_sql('ev_registrations', conn, if_exists='replace', index=False) # Commit changes and close connection conn.commit() conn.close() if __name__ == "__main__": st.set_page_config(page_title="Electric Vehicles Assistant", layout="wide") setup_db() launch_bot()