Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
from datetime import datetime | |
import numpy as np | |
import pmdarima as pm | |
from pmdarima import auto_arima | |
import torch | |
from transformers import pipeline, TapasTokenizer, TapasForQuestionAnswering | |
st.set_page_config( | |
page_title="Sales Forecasting System", | |
page_icon="π", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
) | |
# Preprocessing | |
def merge(B, C, A): | |
i = j = k = 0 | |
# Convert 'Date' columns to datetime.date objects | |
B['Date'] = pd.to_datetime(B['Date']).dt.date | |
C['Date'] = pd.to_datetime(C['Date']).dt.date | |
A['Date'] = pd.to_datetime(A['Date']).dt.date | |
while i < len(B) and j < len(C): | |
if B['Date'].iloc[i] <= C['Date'].iloc[j]: | |
A['Date'].iloc[k] = B['Date'].iloc[i] | |
A['Sales'].iloc[k] = B['Sales'].iloc[i] | |
i += 1 | |
else: | |
A['Date'].iloc[k] = C['Date'].iloc[j] | |
A['Sales'].iloc[k] = C['Sales'].iloc[j] | |
j += 1 | |
k += 1 | |
while i < len(B): | |
A['Date'].iloc[k] = B['Date'].iloc[i] | |
A['Sales'].iloc[k] = B['Sales'].iloc[i] | |
i += 1 | |
k += 1 | |
while j < len(C): | |
A['Date'].iloc[k] = C['Date'].iloc[j] | |
A['Sales'].iloc[k] = C['Sales'].iloc[j] | |
j += 1 | |
k += 1 | |
return A | |
def merge_sort(dataframe): | |
if len(dataframe) > 1: | |
center = len(dataframe) // 2 | |
left = dataframe.iloc[:center] | |
right = dataframe.iloc[center:] | |
merge_sort(left) | |
merge_sort(right) | |
return merge(left, right, dataframe) | |
else: | |
return dataframe | |
def drop (dataframe): | |
def get_columns_containing(dataframe, substrings): | |
return [col for col in dataframe.columns if any(substring.lower() in col.lower() for substring in substrings)] | |
columns_to_keep = get_columns_containing(dataframe, ["date", "sale"]) | |
dataframe = dataframe.drop(columns=dataframe.columns.difference(columns_to_keep)) | |
dataframe = dataframe.dropna() | |
return dataframe | |
def date_format(dataframe): | |
for i, d, s in dataframe.itertuples(): | |
dataframe['Date'][i] = dataframe['Date'][i].strip() | |
for i, d, s in dataframe.itertuples(): | |
new_date = datetime.strptime(dataframe['Date'][i], "%m/%d/%Y").date() | |
dataframe['Date'][i] = new_date | |
return dataframe | |
def group_to_three(dataframe): | |
dataframe['Date'] = pd.to_datetime(dataframe['Date']) | |
dataframe = dataframe.groupby([pd.Grouper(key='Date', freq='3D')])['Sales'].mean().round(2) | |
dataframe = dataframe.replace(0, np.nan).dropna() | |
return dataframe | |
# SARIMAX Model | |
def train_test(dataframe, n): | |
training_y = dataframe.iloc[:-n,0] | |
test_y = dataframe.iloc[-n:,0] | |
test_y_series = pd.Series(test_y, index=dataframe.iloc[-n:, 0].index) | |
training_X = dataframe.iloc[:-n,1:] | |
test_X = dataframe.iloc[-n:,1:] | |
future_X = dataframe.iloc[0:,1:] | |
return (training_y, test_y, test_y_series, training_X, test_X, future_X) | |
def model_fitting(dataframe, Exo): | |
futureModel = pm.auto_arima(dataframe['Sales'], X=Exo, start_p=1, start_q=1, | |
test='adf',min_p=1,min_q=1, | |
max_p=3, max_q=3, m=12, | |
start_P=0, seasonal=True, | |
d=None, D=1, trace=True, | |
error_action='ignore', | |
suppress_warnings=True, | |
stepwise=True) | |
model = futureModel | |
return model | |
def test_fitting(dataframe, Exo, trainY): | |
trainTestModel = auto_arima(X = Exo, y = trainY, start_p=1, start_q=1, | |
test='adf',min_p=1,min_q=1, | |
max_p=3, max_q=3, m=12, | |
start_P=0, seasonal=True, | |
d=None, D=1, trace=True, | |
error_action='ignore', | |
suppress_warnings=True, | |
stepwise=True) | |
model = trainTestModel | |
return model | |
def forecast_accuracy(forecast, actual): | |
mape = np.mean(np.abs(forecast - actual)/np.abs(actual)).round(4) # MAPE | |
rmse = (np.mean((forecast - actual)**2)**.5).round(2) # RMSE | |
corr = np.corrcoef(forecast, actual)[0,1] # corr | |
mins = np.amin(np.hstack([forecast[:,None], | |
actual[:,None]]), axis=1) | |
maxs = np.amax(np.hstack([forecast[:,None], | |
actual[:,None]]), axis=1) | |
minmax = 1 - np.mean(mins/maxs) # minmax | |
return({'mape':mape, 'rmse':rmse, 'corr':corr, 'min-max':minmax}) | |
def sales_growth(dataframe, fittedValues): | |
sales_growth = fittedValues.to_frame() | |
sales_growth = sales_growth.reset_index() | |
sales_growth.columns = ("Date", "Sales") | |
sales_growth = sales_growth.set_index('Date') | |
sales_growth['Sales'] = (sales_growth['Sales']).round(2) | |
#Calculate and create the column for sales difference and growth | |
sales_growth['Forecasted Sales First Difference']=(sales_growth['Sales']-sales_growth['Sales'].shift(1)).round(2) | |
sales_growth['Forecasted Sales Growth']=(((sales_growth['Sales']-sales_growth['Sales'].shift(1))/sales_growth['Sales'].shift(1))*100).round(2) | |
#Calculate and create the first row for sales difference and growth | |
sales_growth['Forecasted Sales First Difference'].iloc[0] = (dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2]).round(2) | |
sales_growth['Forecasted Sales Growth'].iloc[0]=(((dataframe['Sales'].iloc[-1]-dataframe['Sales'].iloc[-2])/dataframe['Sales'].iloc[-1])*100).round(2) | |
return sales_growth | |
# TAPAS Model | |
model_name = "google/tapas-large-finetuned-wtq" | |
tokenizer = TapasTokenizer.from_pretrained(model_name) | |
model = TapasForQuestionAnswering.from_pretrained(model_name, local_files_only=False) | |
def load_tapas_model(model, tokenizer): | |
pipe = pipeline("table-question-answering", model=model, tokenizer=tokenizer) | |
return pipe | |
pipe = load_tapas_model(model, tokenizer) | |
def get_answer(table, query): | |
answers = pipe(table=table, query=query) | |
print(answers['coordinates']) # FOR DEBUGGING PURPOSES | |
return answers | |
def convert_answer(answer): | |
if answer['aggregator'] == 'SUM': | |
print(answer['answer']) # FOR DEBUGGING | |
cells = answer['cells'] | |
converted = sum(float(value.replace(',', '')) for value in cells) | |
return converted | |
if answer['aggregator'] == 'AVERAGE': | |
print(answer['answer']) # FOR DEBUGGING | |
cells = answer['cells'] | |
values = [float(value.replace(',', '')) for value in cells] | |
converted = sum(values) / len(values) | |
return converted | |
if answer['aggregator'] == 'COUNT': | |
print(answer['answer']) # FOR DEBUGGING | |
cells = answer['cells'] | |
converted = sum(int(value.replace(',', '')) for value in cells) | |
return converted | |
else: | |
return answer | |
def get_converted_answer(table, query): | |
converted_answer = convert_answer(get_answer(table, query)) | |
return converted_answer | |
st.title("Sales Forecasting Dashboard") | |
st.write("π Welcome User, start using the application by uploading your file in the sidebbar!") | |
if 'uploaded' not in st.session_state: | |
st.session_state.uploaded = 'uploaded' | |
# Sidebar Menu | |
with st.sidebar: | |
uploaded_file = st.file_uploader("Upload your Store Data here (must atleast contain Date and Sale)", type=["csv"]) | |
err = 0 | |
if uploaded_file is not None: | |
if uploaded_file.type != 'text/csv': | |
err = 1 | |
st.info('Please upload in CSV format only...') | |
else: | |
st.success("File uploaded successfully!") | |
df = pd.read_csv(uploaded_file, parse_dates=True) | |
st.write("Your uploaded data:") | |
st.write(df) | |
# Data pre-processing | |
df = drop(df) | |
df = date_format(df) | |
merge_sort(df) | |
df = group_to_three(df) | |
st.session_state.uploaded = True | |
with open('sample.csv', 'rb') as f: | |
st.download_button("Download our sample CSV", f, file_name='sample.csv') | |
if (st.session_state.uploaded): | |
st.line_chart(df) | |
forecast_button_clicked = st.button( | |
'Start Forecasting', | |
key='forecast_button', | |
type="primary", | |
disabled=st.session_state.uploaded, | |
) | |
if (forecast_button_clicked): | |
# TODO call arima here | |
pass |