Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
from datetime import datetime | |
import os | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from huggingface_hub import HfApi, upload_file, list_repo_files, hf_hub_download | |
# Configuration for Hugging Face Repository | |
REPO_ID = "MarcosRodrigo/Breakfast-Poll" | |
HISTORY_DIR = "history" | |
TEMP_FILE = "current_selections.csv" | |
# Hugging Face API (requires a token with write access) | |
hf_token = st.secrets["HF_TOKEN"] | |
api = HfApi() | |
# Initialize all required session state variables | |
if "users" not in st.session_state: | |
st.session_state.users = [] | |
if "current_selections" not in st.session_state: | |
st.session_state.current_selections = [] | |
if "step" not in st.session_state: | |
st.session_state.step = 1 | |
if "history" not in st.session_state: | |
st.session_state.history = [] | |
# Load temporary selections from the shared file | |
def load_current_selections(): | |
if os.path.exists(TEMP_FILE): | |
return pd.read_csv(TEMP_FILE) | |
else: | |
return pd.DataFrame(columns=["Name", "Drinks", "Food"]) | |
# Save current user selections to the shared CSV file without overwriting previous data | |
def save_current_selection_to_file(current_selections): | |
current_selections["Drinks"] = current_selections["Drinks"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x) | |
current_selections["Food"] = current_selections["Food"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x) | |
if os.path.exists(TEMP_FILE): | |
existing_selections = pd.read_csv(TEMP_FILE) | |
combined_selections = pd.concat([existing_selections, current_selections]).drop_duplicates() | |
else: | |
combined_selections = current_selections | |
combined_selections.to_csv(TEMP_FILE, index=False) | |
# Upload the shared file to Hugging Face repository for persistence | |
def upload_temp_file_to_repo(): | |
if os.path.exists(TEMP_FILE): | |
upload_file( | |
path_or_fileobj=TEMP_FILE, | |
path_in_repo=TEMP_FILE, | |
repo_id=REPO_ID, | |
token=hf_token, | |
repo_type="space" | |
) | |
# Delete a file from the repository (e.g., `current_selections.csv`) | |
def delete_file_from_repo(filename): | |
api.delete_file( | |
path_in_repo=filename, | |
repo_id=REPO_ID, | |
token=hf_token, | |
repo_type="space" | |
) | |
# Download the shared file from the repository to ensure persistence and real-time updates | |
def download_temp_file_from_repo(): | |
try: | |
hf_hub_download(repo_id=REPO_ID, filename=TEMP_FILE, repo_type="space", token=hf_token, local_dir=".") | |
except Exception: | |
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
# Load history from the repository | |
def load_history(): | |
history = [] | |
files_in_repo = list_repo_files(REPO_ID, token=hf_token, repo_type="space") | |
history_files = [f for f in files_in_repo if f.startswith(f"{HISTORY_DIR}/") and f.endswith(".txt")] | |
for file in history_files: | |
local_filepath = hf_hub_download(repo_id=REPO_ID, filename=file, token=hf_token, repo_type="space") | |
summary_df = pd.read_csv(local_filepath) | |
date = file.split("/")[-1].split(".txt")[0] | |
history.append({"Date": date, "Summary": summary_df}) | |
return history | |
# Save the current summary to a text file in the history directory | |
def save_summary_to_history(): | |
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
history_filename = f"{HISTORY_DIR}/{timestamp}.txt" | |
if not os.path.exists(HISTORY_DIR): | |
os.makedirs(HISTORY_DIR) | |
if os.path.exists(TEMP_FILE): | |
summary_df = pd.read_csv(TEMP_FILE) | |
summary_df.to_csv(history_filename, index=False) | |
upload_file(path_or_fileobj=history_filename, path_in_repo=history_filename, repo_id=REPO_ID, token=hf_token, repo_type="space") | |
return timestamp | |
# Load persistent history and temporary selections on app start | |
if "history" not in st.session_state: | |
download_temp_file_from_repo() | |
st.session_state.history = load_history() | |
st.session_state.current_selections = load_current_selections().to_dict(orient="records") | |
# Sidebar for navigating through different views | |
menu = st.sidebar.selectbox("Select View", ["Poll", "Current", "History", "Graph"]) | |
# Function to reset the current selections after submission | |
def reset_selections(): | |
st.session_state.users = [] | |
st.session_state.current_selections = [] | |
# Poll view with four consecutive steps | |
if menu == "Poll": | |
st.title("Breakfast Poll Application") | |
# Step 1: User's Name | |
st.header("Step 1: Enter your name") | |
name = st.text_input("Name:") | |
if st.button("Next", key="step1_next") and name: | |
st.session_state.users.append(name) | |
st.session_state.step = 2 # Set the next step to be visible | |
# Show Step 2 only if Step 1 is completed | |
if st.session_state.step >= 2: | |
st.header("Step 2: Select your drink(s)") | |
drinks_options = [ | |
"Café con leche", "Colacao", "Descafeinado con leche", "Cortado", | |
"Aguasusia", "Aguasusia susia", "Café descafeinado con leche desnatada", | |
"Italiano", "Café con soja", "Té", "Manzanilla", "Nada" | |
] | |
selected_drinks = st.multiselect("Choose your drinks:", drinks_options) | |
if st.button("Next", key="step2_next") and selected_drinks: | |
st.session_state.current_selections.append({"Name": st.session_state.users[-1], "Drinks": selected_drinks}) | |
st.session_state.step = 3 # Set the next step to be visible | |
# Show Step 3 only if Step 2 is completed | |
if st.session_state.step >= 3: | |
st.header("Step 3: Select your food(s)") | |
food_options = [ | |
"Barrita con aceite", "Barrita con tomate", "Palmera de chocolate", | |
"Palmera de chocolate blanco", "Yogurt", "Pincho de tortilla", "Nada" | |
] | |
selected_food = st.multiselect("Choose your food:", food_options) | |
if st.button("Save Selections", key="save_selections") and selected_food: | |
st.session_state.current_selections[-1]["Food"] = selected_food | |
df = pd.DataFrame(st.session_state.current_selections) | |
save_current_selection_to_file(df) | |
upload_temp_file_to_repo() | |
st.success(f"Selections saved for {st.session_state.users[-1]}!") | |
st.session_state.step = 1 # Reset to step 1 for the next user | |
# "Current" view to display the current summary of all users' selections and submit to history | |
elif menu == "Current": | |
st.title("Current Selections of All Users") | |
if st.button("Reload Selections"): | |
download_temp_file_from_repo() | |
current_df = load_current_selections() | |
st.table(current_df) | |
if st.button("Submit Summary to History"): | |
timestamp = save_summary_to_history() | |
st.success(f"Summary saved to history at {timestamp}") | |
st.session_state.history = load_history() | |
# Clear local and remote current selections | |
if os.path.exists(TEMP_FILE): | |
os.remove(TEMP_FILE) | |
delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo | |
# Create an empty CSV to replace the deleted one | |
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
upload_temp_file_to_repo() | |
# st.experimental_set_query_params(step="reset") | |
# History view to check past summaries | |
elif menu == "History": | |
st.title("Breakfast Poll History") | |
# Reload history if it's not already loaded | |
if not st.session_state.history: | |
st.session_state.history = load_history() | |
if st.session_state.history: | |
# Display history in reverse chronological order | |
for record in reversed(st.session_state.history): | |
st.subheader(f"Date: {record['Date']}") | |
st.table(record["Summary"]) | |
else: | |
st.write("No history records found.") | |
# # "Current" view to display the current summary of all users' selections and submit to history | |
# elif menu == "Current": | |
# st.title("Current Selections of All Users") | |
# if st.button("Reload Selections"): | |
# download_temp_file_from_repo() | |
# current_df = load_current_selections() | |
# st.table(current_df) | |
# if st.button("Submit Summary to History"): | |
# timestamp = save_summary_to_history() | |
# st.success(f"Summary saved to history at {timestamp}") | |
# st.session_state.history = load_history() | |
# # Clear local and remote current selections | |
# if os.path.exists(TEMP_FILE): | |
# os.remove(TEMP_FILE) | |
# delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo | |
# # Create an empty CSV to replace the deleted one | |
# pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
# upload_temp_file_to_repo() | |
# "Current" view to display the current summary of all users' selections and generate ticket | |
elif menu == "Current": | |
st.title("Current Selections of All Users") | |
if st.button("Reload Selections"): | |
download_temp_file_from_repo() | |
current_df = load_current_selections() | |
st.table(current_df) | |
# Define item prices | |
item_prices = { | |
"Café con leche": 1.20, "Colacao": 1.00, "Descafeinado con leche": 1.20, "Cortado": 1.20, | |
"Aguasusia": 1.20, "Aguasusia susia": 1.20, "Café descafeinado con leche desnatada": 1.20, | |
"Italiano": 1.20, "Café con soja": 1.20, "Té": 1.00, "Manzanilla": 1.00, "Nada": 0.00, | |
"Barrita con aceite": 1.00, "Barrita con tomate": 1.00, "Palmera de chocolate": 1.00, | |
"Palmera de chocolate blanco": 1.00, "Yogurt": 1.00, "Pincho de tortilla": 1.00 | |
} | |
# Define combined prices for special combinations | |
combo_prices = { | |
"desayuno + café (aceite)": 1.85, | |
"desayuno + café (tomate)": 2.50 | |
} | |
# Generate Ticket | |
if st.button("Generate Ticket"): | |
# Create a list to hold the ticket items | |
ticket = [] | |
# Iterate over each user's selections | |
for _, row in current_df.iterrows(): | |
drinks = row['Drinks'].split(", ") if isinstance(row['Drinks'], str) else [] | |
food = row['Food'].split(", ") if isinstance(row['Food'], str) else [] | |
# Track which items have been used in combinations | |
used_drinks = set() | |
used_food = set() | |
# Handle combinations of café + barrita con aceite | |
for drink in drinks: | |
if "café" in drink.lower() and "Barrita con aceite" in food: | |
ticket.append({"Item": "desayuno + café (aceite)", "Price": combo_prices["desayuno + café (aceite)"]}) | |
used_drinks.add(drink) | |
used_food.add("Barrita con aceite") | |
break # Only one combo per user for this | |
# Handle combinations of café + barrita con tomate | |
for drink in drinks: | |
if "café" in drink.lower() and "Barrita con tomate" in food and drink not in used_drinks: | |
ticket.append({"Item": "desayuno + café (tomate)", "Price": combo_prices["desayuno + café (tomate)"]}) | |
used_drinks.add(drink) | |
used_food.add("Barrita con tomate") | |
break # Only one combo per user for this | |
# Add remaining individual drinks not used in combinations | |
for drink in drinks: | |
if drink not in used_drinks and drink in item_prices: | |
ticket.append({"Item": drink, "Price": item_prices[drink]}) | |
used_drinks.add(drink) | |
# Add remaining individual food not used in combinations | |
for f in food: | |
if f not in used_food and f in item_prices: | |
ticket.append({"Item": f, "Price": item_prices[f]}) | |
used_food.add(f) | |
# Create a DataFrame to display the ticket | |
ticket_df = pd.DataFrame(ticket) | |
st.subheader("Generated Ticket") | |
st.table(ticket_df) | |
# Calculate and display the total price | |
total_price = ticket_df["Price"].sum() | |
st.write(f"**Total Price:** {total_price:.2f} €") | |
# History view to check past summaries | |
elif menu == "History": | |
st.title("Breakfast Poll History") | |
# Reload history if it's not already loaded | |
if not st.session_state.history: | |
st.session_state.history = load_history() | |
if st.session_state.history: | |
# Display history in reverse chronological order | |
for record in reversed(st.session_state.history): | |
st.subheader(f"Date: {record['Date']}") | |
st.table(record["Summary"]) | |
else: | |
st.write("No history records found.") | |
# Graph view to display a line chart of item selections over time | |
elif menu == "Graph": | |
st.title("Breakfast Poll History - Graph View") | |
# Load the history if not already loaded | |
if not st.session_state.history: | |
st.session_state.history = load_history() | |
# Prepare data for plotting | |
if st.session_state.history: | |
history_data = [] | |
user_data = {} # Store user-specific data | |
for record in st.session_state.history: | |
# Extract only the date part (YYYY-MM-DD) for display | |
date = record['Date'].split("_")[0] # Use only the YYYY-MM-DD portion of the date | |
for index, row in record['Summary'].iterrows(): | |
user = row['Name'] | |
for drink in row['Drinks'].split(', '): | |
history_data.append({'Date': date, 'Item': drink, 'Type': 'Drink', 'User': user}) | |
for food in row['Food'].split(', '): | |
history_data.append({'Date': date, 'Item': food, 'Type': 'Food', 'User': user}) | |
# Append user data for selection | |
if user not in user_data: | |
user_data[user] = True # Initialize all users as visible by default | |
# Create a DataFrame from history data | |
history_df = pd.DataFrame(history_data) | |
# Count occurrences of each item per date | |
item_counts = history_df.groupby(['Date', 'Item', 'Type', 'User']).size().reset_index(name='Count') | |
# Separate items into Drinks and Food, and sort them alphabetically | |
drinks = sorted(item_counts[item_counts['Type'] == 'Drink']['Item'].unique()) | |
foods = sorted(item_counts[item_counts['Type'] == 'Food']['Item'].unique()) | |
# Create a dictionary to store the checkbox values for each item | |
item_visibility = {} | |
# Create interactive checkboxes for Drinks, Food, and Users in the sidebar | |
st.sidebar.header("Select Items to Display") | |
# Drinks Section | |
if drinks: | |
st.sidebar.subheader("Drinks") | |
for item in drinks: | |
# Add a unique key to each checkbox to avoid duplicate widget IDs | |
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Drink") | |
# Food Section | |
if foods: | |
st.sidebar.subheader("Food") | |
for item in foods: | |
# Add a unique key to each checkbox to avoid duplicate widget IDs | |
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Food") | |
# User Section: Create a checkbox for each user to toggle their visibility | |
st.sidebar.subheader("Users") | |
for user in user_data.keys(): | |
user_data[user] = st.sidebar.checkbox(user, value=True, key=f"checkbox_user_{user}") | |
# Filter the data based on selected items and users | |
selected_items = [item for item, visible in item_visibility.items() if visible] | |
selected_users = [user for user, visible in user_data.items() if visible] | |
filtered_item_counts = item_counts[item_counts['Item'].isin(selected_items) & item_counts['User'].isin(selected_users)] | |
# Check if there is data to display | |
if not filtered_item_counts.empty: | |
# Create a line plot for each selected item over time | |
plt.figure(figsize=(12, 6)) | |
sns.lineplot(data=filtered_item_counts, x='Date', y='Count', hue='Item', marker='o') | |
# Customize the y-axis to show only integer labels | |
y_max = max(filtered_item_counts['Count'].max() + 1, 1) # Set y_max to at least 1 to avoid errors | |
plt.yticks(range(0, y_max)) # Show only integer labels on the y-axis | |
# Customize the plot | |
plt.xticks(rotation=45) | |
plt.title('Item Selections Over Time') | |
plt.xlabel('Date') | |
plt.ylabel('Number of Selections') | |
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3) | |
# Display the plot | |
st.pyplot(plt.gcf()) | |
else: | |
st.write("No data to display. Please select at least one user and one item.") | |
else: | |
st.write("No historical data available to plot.") | |