Spaces:
Paused
Paused
| import atexit | |
| import Levenshtein | |
| import os | |
| import sys | |
| sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir)) | |
| import streamlit as st | |
| from streamlit_chat import message | |
| from query_methods import query, avail_query_methods | |
| import pickle | |
| conversations_file = "conversations.pkl" | |
| def load_conversations(): | |
| try: | |
| with open(conversations_file, "rb") as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| return [] | |
| except EOFError: | |
| return [] | |
| def save_conversations(conversations, current_conversation): | |
| updated = False | |
| for idx, conversation in enumerate(conversations): | |
| if conversation == current_conversation: | |
| conversations[idx] = current_conversation | |
| updated = True | |
| break | |
| if not updated: | |
| conversations.append(current_conversation) | |
| temp_conversations_file = "temp_" + conversations_file | |
| with open(temp_conversations_file, "wb") as f: | |
| pickle.dump(conversations, f) | |
| os.replace(temp_conversations_file, conversations_file) | |
| def delete_conversation(conversations, current_conversation): | |
| for idx, conversation in enumerate(conversations): | |
| conversations[idx] = current_conversation | |
| break | |
| conversations.remove(current_conversation) | |
| temp_conversations_file = "temp_" + conversations_file | |
| with open(temp_conversations_file, "wb") as f: | |
| pickle.dump(conversations, f) | |
| os.replace(temp_conversations_file, conversations_file) | |
| def exit_handler(): | |
| print("Exiting, saving data...") | |
| # Perform cleanup operations here, like saving data or closing open files. | |
| save_conversations(st.session_state.conversations, st.session_state.current_conversation) | |
| # Register the exit_handler function to be called when the program is closing. | |
| atexit.register(exit_handler) | |
| st.header("Chat Placeholder") | |
| if 'conversations' not in st.session_state: | |
| st.session_state['conversations'] = load_conversations() | |
| if 'input_text' not in st.session_state: | |
| st.session_state['input_text'] = '' | |
| if 'selected_conversation' not in st.session_state: | |
| st.session_state['selected_conversation'] = None | |
| if 'input_field_key' not in st.session_state: | |
| st.session_state['input_field_key'] = 0 | |
| if 'query_method' not in st.session_state: | |
| st.session_state['query_method'] = query | |
| if 'search_query' not in st.session_state: | |
| st.session_state['search_query'] = '' | |
| # Initialize new conversation | |
| if 'current_conversation' not in st.session_state or st.session_state['current_conversation'] is None: | |
| st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []} | |
| input_placeholder = st.empty() | |
| user_input = input_placeholder.text_input( | |
| 'You:', value=st.session_state['input_text'], key=f'input_text_-1'#{st.session_state["input_field_key"]} | |
| ) | |
| submit_button = st.button("Submit") | |
| if (user_input and user_input != st.session_state['input_text']) or submit_button: | |
| output = query(user_input, st.session_state['query_method']) | |
| escaped_output = output.encode('utf-8').decode('unicode-escape') | |
| st.session_state['current_conversation']['user_inputs'].append(user_input) | |
| st.session_state.current_conversation['generated_responses'].append(escaped_output) | |
| save_conversations(st.session_state.conversations, st.session_state.current_conversation) | |
| st.session_state['input_text'] = '' | |
| st.session_state['input_field_key'] += 1 # Increment key value for new widget | |
| user_input = input_placeholder.text_input( | |
| 'You:', value=st.session_state['input_text'], key=f'input_text_{st.session_state["input_field_key"]}' | |
| ) # Clear the input field | |
| # Add a button to create a new conversation | |
| if st.sidebar.button("New Conversation"): | |
| st.session_state['selected_conversation'] = None | |
| st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []} | |
| st.session_state['input_field_key'] += 1 # Increment key value for new widget | |
| st.session_state['query_method'] = st.sidebar.selectbox("Select API:", options=avail_query_methods, index=0) | |
| # Proxy | |
| st.session_state['proxy'] = st.sidebar.text_input("Proxy: ") | |
| # Searchbar | |
| search_query = st.sidebar.text_input("Search Conversations:", value=st.session_state.get('search_query', ''), key='search') | |
| if search_query: | |
| filtered_conversations = [] | |
| indices = [] | |
| for idx, conversation in enumerate(st.session_state.conversations): | |
| if search_query in conversation['user_inputs'][0]: | |
| filtered_conversations.append(conversation) | |
| indices.append(idx) | |
| filtered_conversations = list(zip(indices, filtered_conversations)) | |
| conversations = sorted(filtered_conversations, key=lambda x: Levenshtein.distance(search_query, x[1]['user_inputs'][0])) | |
| sidebar_header = f"Search Results ({len(conversations)})" | |
| else: | |
| conversations = st.session_state.conversations | |
| sidebar_header = "Conversation History" | |
| # Sidebar | |
| st.sidebar.header(sidebar_header) | |
| sidebar_col1, sidebar_col2 = st.sidebar.columns([5,1]) | |
| for idx, conversation in enumerate(conversations): | |
| if sidebar_col1.button(f"Conversation {idx + 1}: {conversation['user_inputs'][0]}", key=f"sidebar_btn_{idx}"): | |
| st.session_state['selected_conversation'] = idx | |
| st.session_state['current_conversation'] = conversation | |
| if sidebar_col2.button('🗑️', key=f"sidebar_btn_delete_{idx}"): | |
| if st.session_state['selected_conversation'] == idx: | |
| st.session_state['selected_conversation'] = None | |
| st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []} | |
| delete_conversation(conversations, conversation) | |
| st.experimental_rerun() | |
| if st.session_state['selected_conversation'] is not None: | |
| conversation_to_display = conversations[st.session_state['selected_conversation']] | |
| else: | |
| conversation_to_display = st.session_state.current_conversation | |
| if conversation_to_display['generated_responses']: | |
| for i in range(len(conversation_to_display['generated_responses']) - 1, -1, -1): | |
| message(conversation_to_display["generated_responses"][i], key=f"display_generated_{i}") | |
| message(conversation_to_display['user_inputs'][i], is_user=True, key=f"display_user_{i}") |