Breakfast-Poll / app.py
MarcosRodrigo's picture
Update app.py
5612612 verified
import streamlit as st
import pandas as pd
from datetime import datetime
import os
import matplotlib.pyplot as plt
import seaborn as sns
from huggingface_hub import HfApi, upload_file, list_repo_files, hf_hub_download
# Configuration for Hugging Face Repository
REPO_ID = "MarcosRodrigo/Breakfast-Poll"
HISTORY_DIR = "history"
TEMP_FILE = "current_selections.csv"
# Hugging Face API (requires a token with write access)
hf_token = st.secrets["HF_TOKEN"]
api = HfApi()
# Initialize all required session state variables
if "users" not in st.session_state:
st.session_state.users = []
if "current_selections" not in st.session_state:
st.session_state.current_selections = []
if "step" not in st.session_state:
st.session_state.step = 1
if "history" not in st.session_state:
st.session_state.history = []
# Load temporary selections from the shared file
def load_current_selections():
if os.path.exists(TEMP_FILE):
return pd.read_csv(TEMP_FILE)
else:
return pd.DataFrame(columns=["Name", "Drinks", "Food"])
# Save current user selections to the shared CSV file without overwriting previous data
def save_current_selection_to_file(current_selections):
current_selections["Drinks"] = current_selections["Drinks"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
current_selections["Food"] = current_selections["Food"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
if os.path.exists(TEMP_FILE):
existing_selections = pd.read_csv(TEMP_FILE)
combined_selections = pd.concat([existing_selections, current_selections]).drop_duplicates()
else:
combined_selections = current_selections
combined_selections.to_csv(TEMP_FILE, index=False)
# Upload the shared file to Hugging Face repository for persistence
def upload_temp_file_to_repo():
if os.path.exists(TEMP_FILE):
upload_file(
path_or_fileobj=TEMP_FILE,
path_in_repo=TEMP_FILE,
repo_id=REPO_ID,
token=hf_token,
repo_type="space"
)
# Delete a file from the repository (e.g., `current_selections.csv`)
def delete_file_from_repo(filename):
api.delete_file(
path_in_repo=filename,
repo_id=REPO_ID,
token=hf_token,
repo_type="space"
)
# Download the shared file from the repository to ensure persistence and real-time updates
def download_temp_file_from_repo():
try:
hf_hub_download(repo_id=REPO_ID, filename=TEMP_FILE, repo_type="space", token=hf_token, local_dir=".")
except Exception:
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
# Load history from the repository
def load_history():
history = []
files_in_repo = list_repo_files(REPO_ID, token=hf_token, repo_type="space")
history_files = [f for f in files_in_repo if f.startswith(f"{HISTORY_DIR}/") and f.endswith(".txt")]
for file in history_files:
local_filepath = hf_hub_download(repo_id=REPO_ID, filename=file, token=hf_token, repo_type="space")
summary_df = pd.read_csv(local_filepath)
date = file.split("/")[-1].split(".txt")[0]
history.append({"Date": date, "Summary": summary_df})
return history
# Save the current summary to a text file in the history directory
def save_summary_to_history():
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
history_filename = f"{HISTORY_DIR}/{timestamp}.txt"
if not os.path.exists(HISTORY_DIR):
os.makedirs(HISTORY_DIR)
if os.path.exists(TEMP_FILE):
summary_df = pd.read_csv(TEMP_FILE)
summary_df.to_csv(history_filename, index=False)
upload_file(path_or_fileobj=history_filename, path_in_repo=history_filename, repo_id=REPO_ID, token=hf_token, repo_type="space")
return timestamp
# Load persistent history and temporary selections on app start
if "history" not in st.session_state:
download_temp_file_from_repo()
st.session_state.history = load_history()
st.session_state.current_selections = load_current_selections().to_dict(orient="records")
# Sidebar for navigating through different views
menu = st.sidebar.selectbox("Select View", ["Poll", "Current", "History", "Graph"])
# Function to reset the current selections after submission
def reset_selections():
st.session_state.users = []
st.session_state.current_selections = []
# Poll view with four consecutive steps
if menu == "Poll":
st.title("Breakfast Poll Application")
# Step 1: User's Name
st.header("Step 1: Enter your name")
name = st.text_input("Name:")
if st.button("Next", key="step1_next") and name:
st.session_state.users.append(name)
st.session_state.step = 2 # Set the next step to be visible
# Show Step 2 only if Step 1 is completed
if st.session_state.step >= 2:
st.header("Step 2: Select your drink(s)")
drinks_options = [
"Café con leche", "Colacao", "Descafeinado con leche", "Cortado",
"Aguasusia", "Aguasusia susia", "Café descafeinado con leche desnatada",
"Italiano", "Café con soja", "Té", "Manzanilla", "Nada"
]
selected_drinks = st.multiselect("Choose your drinks:", drinks_options)
if st.button("Next", key="step2_next") and selected_drinks:
st.session_state.current_selections.append({"Name": st.session_state.users[-1], "Drinks": selected_drinks})
st.session_state.step = 3 # Set the next step to be visible
# Show Step 3 only if Step 2 is completed
if st.session_state.step >= 3:
st.header("Step 3: Select your food(s)")
food_options = [
"Barrita aceite", "Barrita tomate", "Palmera chocolate",
"Palmera chocolate blanco", "Yogurt", "Tortilla", "Nada"
]
selected_food = st.multiselect("Choose your food:", food_options)
if st.button("Save Selections", key="save_selections") and selected_food:
st.session_state.current_selections[-1]["Food"] = selected_food
df = pd.DataFrame(st.session_state.current_selections)
save_current_selection_to_file(df)
upload_temp_file_to_repo()
st.success(f"Selections saved for {st.session_state.users[-1]}!")
st.session_state.step = 1 # Reset to step 1 for the next user
# History view to check past summaries
elif menu == "History":
st.title("Breakfast Poll History")
# Reload history if it's not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
if st.session_state.history:
# Display history in reverse chronological order
for record in reversed(st.session_state.history):
st.subheader(f"Date: {record['Date']}")
st.table(record["Summary"])
else:
st.write("No history records found.")
# # "Current" view to display the current summary of all users' selections and submit to history
# elif menu == "Current":
# st.title("Current Selections of All Users")
# if st.button("Reload Selections"):
# download_temp_file_from_repo()
# current_df = load_current_selections()
# st.table(current_df)
# if st.button("Submit Summary to History"):
# timestamp = save_summary_to_history()
# st.success(f"Summary saved to history at {timestamp}")
# st.session_state.history = load_history()
# # Clear local and remote current selections
# if os.path.exists(TEMP_FILE):
# os.remove(TEMP_FILE)
# delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo
# # Create an empty CSV to replace the deleted one
# pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
# upload_temp_file_to_repo()
# "Current" view to display the current summary of all users' selections and generate ticket
elif menu == "Current":
st.title("Current Selections of All Users")
if st.button("Reload Selections"):
download_temp_file_from_repo()
# Load the current selections from the session state or from the file
current_df = load_current_selections()
st.table(current_df)
# Define item prices
item_prices = {
"Cafés": {
"Café con leche": 1.20,
"Descafeinado con leche": 1.20,
"Cortado": 1.20,
"Aguasusia": 1.20,
"Aguasusia susia": 1.20,
"Descafeinado con leche desnatada": 1.20,
"Italiano": 1.20,
"Café con soja": 1.20,
"Café sin lactosa": 1.20,
},
"Infusiones": {
"Té": 0.90,
"Manzanilla": 0.90,
},
"Colacaos": {
"Colacao": 1.50,
},
"Palmeras": {
"Palmera chocolate": 1.50,
"Palmera chocolate blanco": 1.50,
},
"Barrita aceite": 0.65,
"Barrita tomate": 1.30,
"Napolitana": 0.65,
"Croissant": 0.65,
"Yogurt": 1.00,
"Tortilla": 1.50,
"Nada": 0.00,
}
# # Define item prices
# item_prices = {
# "Café con leche": 1.20,
# "Colacao": 1.50,
# "Café Descafeinado con leche": 1.20,
# "Cortado": 1.20,
# "Café Aguasusia": 1.20,
# "Café Aguasusia susia": 1.20,
# "Café descafeinado con leche desnatada": 1.20,
# "Café Italiano": 1.20,
# "Café con soja": 1.20,
# "Té": 0.90,
# "Manzanilla": 0.90,
# "Nada": 0.00,
# "Barrita con aceite": 0.65,
# "Barrita con tomate": 1.30,
# "Palmera de chocolate": 1.50,
# "Palmera de chocolate blanco": 1.50,
# "Yogurt": 1.00,
# "Pincho de tortilla": 1.50
# }
# Define combined prices for special combinations
combo_prices = {
"desayuno + café (aceite)": 1.85,
"desayuno + café (tomate)": 2.50,
"desayuno + café (napolitana)": 1.85,
}
# Use session state to persist ticket generation status
if "ticket_generated" not in st.session_state:
st.session_state.ticket_generated = False
# Generate Ticket Button and Logic
if st.button("Generate Ticket"):
ticket = []
# Iterate over each user's selections
drinks = []
foods = []
for _, row in current_df.iterrows():
drinks.append(row['Drinks'])
foods.append(row['Food'])
# Simplify items
drinks = ["Café" if x in item_prices["Cafés"].keys() else x for x in drinks]
drinks = ["Infusión" if x in item_prices["Infusiones"].keys() else x for x in drinks]
foods = ["Palmera" if x in item_prices["Palmeras"].keys() else x for x in foods]
# Display selections
st.write(f"Drinks: {drinks}")
st.write(f"Foods: {foods}")
# Count items
item_count = {
"Café": len([x for x in drinks if x == "Café"]),
"Infusión": len([x for x in drinks if x == "Infusión"]),
"Colacao": len([x for x in drinks if x == "Colacao"]),
"Barrita aceite": len([x for x in foods if x == "Barrita aceite"]),
"Barrita tomate": len([x for x in foods if x == "Barrita tomate"]),
"Napolitana": len([x for x in foods if x == "Napolitana"]),
"Palmera": len([x for x in foods if x == "Palmeras"]),
"Tortilla": len([x for x in foods if x == "Tortilla"]),
"Croissant": len([x for x in foods if x == "Croissant"]),
"Yogurt": len([x for x in foods if x == "Yogurt"]),
}
# Make associations
item_association = {}
food_count = item_count["Barrita aceite"] + item_count["Barrita tomate"] + item_count["Napolitana"] + item_count["Croissant"]
if item_count["Café"] >= food_count:
item_association["Desayuno + Café (tomate)"] = item_count["Barrita tomate"]
item_association["Desayuno + Café (aceite)"] = item_count["Barrita aceite"]
item_association["Desayuno + Café (napolitana)"] = item_count["Napolitana"]
item_association["Desayuno + Café (croissant)"] = item_count["Croissant"]
item_association["Café"] = item_count["Café"] - food_count
else:
raise NotImplementedError
item_association["Colacaos"] = item_count["Colacao"]
item_association["Yogurts"] = item_count["Yogurt"]
item_association["Tortillas"] = item_count["Tortilla"]
item_association["Palmeras"] = item_count["Palmera"]
item_association["Infusiones"] = item_count["Infusión"]
# Display total values
for key, value in item_association.items():
if value > 0:
st.write(f"{key} x {value}")
# drinks = row['Drinks'].split(", ") if isinstance(row['Drinks'], str) else []
# food = row['Food'].split(", ") if isinstance(row['Food'], str) else []
# used_drinks = set()
# used_food = set()
# # Handle combinations of café + barrita con aceite
# for drink in drinks:
# if "café" in drink.lower() and "Barrita con aceite" in food:
# ticket.append({"Item": "desayuno + café (aceite)", "Price": combo_prices["desayuno + café (aceite)"]})
# used_drinks.add(drink)
# used_food.add("Barrita con aceite")
# break
# # Handle combinations of café + barrita con tomate
# for drink in drinks:
# if "café" in drink.lower() and "Barrita con tomate" in food and drink not in used_drinks:
# ticket.append({"Item": "desayuno + café (tomate)", "Price": combo_prices["desayuno + café (tomate)"]})
# used_drinks.add(drink)
# used_food.add("Barrita con tomate")
# break
# # Add remaining individual drinks not used in combinations
# for drink in drinks:
# if drink not in used_drinks and drink in item_prices:
# ticket.append({"Item": drink, "Price": item_prices[drink]})
# used_drinks.add(drink)
# # Add remaining individual food not used in combinations
# for f in food:
# if f not in used_food and f in item_prices:
# ticket.append({"Item": f, "Price": item_prices[f]})
# used_food.add(f)
# Create a DataFrame to display the ticket
ticket_df = pd.DataFrame(ticket)
# Format prices to show only 2 decimals
ticket_df["Price"] = ticket_df["Price"].apply(lambda x: f"{x:.2f}")
st.subheader("Generated Ticket")
st.table(ticket_df)
# Calculate and display the total price
total_price = sum([float(price) for price in ticket_df["Price"]])
st.write(f"**Total Price:** {total_price:.2f} €")
# Set ticket_generated to True in session state
st.session_state.ticket_generated = True
# Only show the "Submit Summary to History" button if a ticket is generated
if st.session_state.ticket_generated:
if st.button("Submit Summary to History"):
timestamp = save_summary_to_history()
st.success(f"Summary saved to history at {timestamp}")
st.session_state.history = load_history()
# Clear local and remote current selections
if os.path.exists(TEMP_FILE):
os.remove(TEMP_FILE)
delete_file_from_repo(TEMP_FILE)
# Create an empty CSV to replace the deleted one
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
upload_temp_file_to_repo()
# Reset session state for current selections and ticket generation status
st.session_state.current_selections = []
st.session_state.ticket_generated = False
# Reload the current selections to show an empty table
current_df = pd.DataFrame(columns=["Name", "Drinks", "Food"])
st.table(current_df)
# History view to check past summaries
elif menu == "History":
st.title("Breakfast Poll History")
# Reload history if it's not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
if st.session_state.history:
# Display history in reverse chronological order
for record in reversed(st.session_state.history):
st.subheader(f"Date: {record['Date']}")
st.table(record["Summary"])
else:
st.write("No history records found.")
# Graph view to display a line chart of item selections over time
elif menu == "Graph":
st.title("Breakfast Poll History - Graph View")
# Load the history if not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
# Prepare data for plotting
if st.session_state.history:
history_data = []
user_data = {} # Store user-specific data
for record in st.session_state.history:
# Extract only the date part (YYYY-MM-DD) for display
date = record['Date'].split("_")[0] # Use only the YYYY-MM-DD portion of the date
for index, row in record['Summary'].iterrows():
user = row['Name']
for drink in row['Drinks'].split(', '):
history_data.append({'Date': date, 'Item': drink, 'Type': 'Drink', 'User': user})
for food in row['Food'].split(', '):
history_data.append({'Date': date, 'Item': food, 'Type': 'Food', 'User': user})
# Append user data for selection
if user not in user_data:
user_data[user] = True # Initialize all users as visible by default
# Create a DataFrame from history data
history_df = pd.DataFrame(history_data)
# Count occurrences of each item per date
item_counts = history_df.groupby(['Date', 'Item', 'Type', 'User']).size().reset_index(name='Count')
# Separate items into Drinks and Food, and sort them alphabetically
drinks = sorted(item_counts[item_counts['Type'] == 'Drink']['Item'].unique())
foods = sorted(item_counts[item_counts['Type'] == 'Food']['Item'].unique())
# Create a dictionary to store the checkbox values for each item
item_visibility = {}
# Create interactive checkboxes for Drinks, Food, and Users in the sidebar
st.sidebar.header("Select Items to Display")
# Drinks Section
if drinks:
st.sidebar.subheader("Drinks")
for item in drinks:
# Add a unique key to each checkbox to avoid duplicate widget IDs
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Drink")
# Food Section
if foods:
st.sidebar.subheader("Food")
for item in foods:
# Add a unique key to each checkbox to avoid duplicate widget IDs
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Food")
# User Section: Create a checkbox for each user to toggle their visibility
st.sidebar.subheader("Users")
for user in user_data.keys():
user_data[user] = st.sidebar.checkbox(user, value=True, key=f"checkbox_user_{user}")
# Filter the data based on selected items and users
selected_items = [item for item, visible in item_visibility.items() if visible]
selected_users = [user for user, visible in user_data.items() if visible]
filtered_item_counts = item_counts[item_counts['Item'].isin(selected_items) & item_counts['User'].isin(selected_users)]
# Check if there is data to display
if not filtered_item_counts.empty:
# Create a line plot for each selected item over time
plt.figure(figsize=(12, 6))
sns.lineplot(data=filtered_item_counts, x='Date', y='Count', hue='Item', marker='o')
# Customize the y-axis to show only integer labels
y_max = max(filtered_item_counts['Count'].max() + 1, 1) # Set y_max to at least 1 to avoid errors
plt.yticks(range(0, y_max)) # Show only integer labels on the y-axis
# Customize the plot
plt.xticks(rotation=45)
plt.title('Item Selections Over Time')
plt.xlabel('Date')
plt.ylabel('Number of Selections')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3)
# Display the plot
st.pyplot(plt.gcf())
else:
st.write("No data to display. Please select at least one user and one item.")
else:
st.write("No historical data available to plot.")