Spaces:
Sleeping
Sleeping
import pandas as pd | |
from pandas.tseries.offsets import MonthEnd | |
import gradio as gr | |
import random | |
# Function to generate a random light color | |
def generate_random_light_color(): | |
min_brightness = 0.7 | |
while True: | |
r, g, b = [random.randint(128, 255) for _ in range(3)] | |
brightness = (r * 0.299 + g * 0.587 + b * 0.114) / 255 | |
if brightness >= min_brightness: | |
return '#{:02x}{:02x}{:02x}'.format(*(r, g, b)) | |
# Function to set the background color of a specific cell | |
def set_cell_color(styles_df, index, column, hex_color): | |
styles_df.at[index, column] = f'background-color: {hex_color}' | |
return styles_df | |
# Function to calculate the threshold | |
def calculate_threshold(value, is_high_price=True): | |
if 0 <= value <= 200: | |
return 0.20 | |
elif 201 <= value <= 500: | |
return 0.70 | |
elif 501 <= value <= 1000: | |
return 1.0 | |
elif 1001 <= value <= 2000: | |
return 2.0 | |
elif 2001 <= value <= 3000: | |
return 3.0 | |
elif 3001 <= value <= 4000: | |
return 4.0 | |
elif 4001 <= value <= 5000: | |
return 5.0 | |
else: | |
return 5.0 | |
def last_thursday(dt): | |
# Get the last day of the month | |
last_day_of_month = dt + MonthEnd(0) | |
# Calculate how many days to subtract to get the last Thursday | |
offset = (last_day_of_month.weekday() - 3) % 7 | |
return last_day_of_month - pd.Timedelta(days=offset) | |
def check_condition_passed(df, column_name, max_index, output_value, close_column, value1, value2, is_high = True): | |
# print(f"Max index: {max_index}") | |
for index_maybe in range(max_index-1, -1, -1): | |
# print(f"At index : {index_maybe}") | |
if is_high: | |
if df.loc[index_maybe, column_name] > output_value: | |
close_value = df.loc[index_maybe, close_column] | |
# print(f"current value: {df.loc[index_maybe, column_name]} - close value {close_value}") | |
if close_value > value1 and close_value > value2: | |
return True | |
break | |
else: | |
if df.loc[index_maybe, column_name] < output_value: | |
close_value = df.loc[index_maybe, close_column] | |
if close_value < value1 and close_value < value2: | |
return True | |
break | |
return False | |
# if is_high: | |
# filtered_df = df[(df.index < max_index) & (df[column_name] > output_value)] | |
# else: | |
# filtered_df = df[(df.index < max_index) & (df[column_name] < output_value)] | |
# if not filtered_df.empty: | |
# first_valid_row = filtered_df.iloc[0] | |
# print(f"Respective close row: {first_valid_row}") | |
# close_value = first_valid_row[close_column] | |
# print(f"Respective close value: {close_value}") | |
# if is_high: | |
# if close_value > value1 and close_value > value2: | |
# return True | |
# else: | |
# if close_value < value1 and close_value < value2: | |
# return True | |
# else: | |
# return False | |
def get_output_value(value1, value2, is_high=False): | |
if is_high: | |
return max(int(value1), int(value2)) + 1 | |
else: | |
return min(int(value1), int(value2)) - 1 | |
# Function to read CSV and generate Excel with modifications | |
def process_csv(file): | |
df = pd.read_csv(file) | |
df.columns = df.columns.str.strip() # Remove trailing spaces from column names | |
HIGH_NAME = "HIGH PRICE" | |
if HIGH_NAME not in df.columns: | |
HIGH_NAME = "HIGH" | |
LOW_NAME = "LOW PRICE" | |
if LOW_NAME not in df.columns: | |
LOW_NAME = "LOW" | |
CLOSE_NAME = "CLOSE PRICE" | |
if CLOSE_NAME not in df.columns: | |
CLOSE_NAME = "close" | |
DATE_NAME = "DATE" | |
if DATE_NAME not in df.columns: | |
DATE_NAME = "Date" | |
# Add three empty columns between LOW PRICE and CLOSE PRICE | |
low_price_index = df.columns.get_loc(CLOSE_NAME) | |
df.insert(low_price_index + 1, 'HIGH Result', '') | |
df.insert(low_price_index + 2, 'LOW Result', '') | |
df.insert(low_price_index + 3, 'Empty Column', '') | |
# Convert DATE to datetime | |
df[DATE_NAME] = pd.to_datetime(df[DATE_NAME], format='%d-%b-%Y') | |
# Detect the last Thursday of each month and insert an empty row after it | |
df['Last_Thursday'] = df[DATE_NAME].apply(last_thursday) | |
indices_to_insert = [] | |
for i in range(len(df)): | |
if df.loc[i, DATE_NAME] == df.loc[i, 'Last_Thursday']: | |
indices_to_insert.append(i) | |
df['Separator'] = '' | |
# Insert empty rows and update the Last_Thursday column | |
for idx in reversed(indices_to_insert): | |
# Insert an empty row | |
df = pd.concat([df.iloc[:idx], pd.DataFrame([{'Separator': 'Separator'}]), df.iloc[idx:]]).reset_index(drop=True) | |
price_columns = [HIGH_NAME, LOW_NAME, CLOSE_NAME] | |
df[price_columns] = df[price_columns].replace({',': ''}, regex=True).apply(pd.to_numeric, errors='coerce') | |
# Calculate global thresholds for HIGH PRICE and LOW PRICE columns | |
high_price_threshold = calculate_threshold(df[HIGH_NAME].max(), is_high_price=True) | |
low_price_threshold = calculate_threshold(df[LOW_NAME].min(), is_high_price=False) | |
# Process HIGH PRICE and LOW PRICE columns | |
def process_column(df, style_df, column_name, result_column_name, threshold): | |
element_used = [False] * len(df[column_name]) | |
# for last_thurday_date, group in df.groupby('Last_Thursday', sort=False): | |
grouped_df = df.groupby((df['Separator'] == 'Separator').cumsum()) | |
for group_name, group in grouped_df: | |
group = group[group['Separator'] != 'Separator'] | |
rows = group.index.tolist() | |
print(rows) | |
for i in range(len(rows) - 1, -1, -1): | |
if not element_used[rows[i]]: | |
for j in range(i - 1, -1, -1): | |
diff = abs(df.loc[rows[i], column_name] - df.loc[rows[j], column_name]) | |
if diff < threshold and not element_used[rows[j]]: | |
output_value = get_output_value(df.loc[rows[i], column_name], df.loc[rows[j], column_name], 'high' in column_name.lower()) | |
# print(f"i {rows[i]} j {rows[j]} {column_name}") | |
# print(f"{df.loc[rows[i], column_name]} {df.loc[rows[j], column_name]} diff {diff}, threshold: {threshold}, output value {output_value}") | |
df.at[rows[j], result_column_name] = output_value | |
element_used[rows[i]] = True | |
element_used[rows[j]] = True | |
color = generate_random_light_color() | |
style_df = set_cell_color(style_df, index=rows[i], column=column_name, hex_color=color) | |
style_df = set_cell_color(style_df, index=rows[j], column=column_name, hex_color=color) | |
# check if there is higher or lower value, if yes, then colorize it | |
response = check_condition_passed(df, column_name, rows[j], output_value, CLOSE_NAME, df.loc[rows[i], column_name], df.loc[rows[j], column_name], 'high' in column_name.lower()) | |
if response: | |
style_df = set_cell_color(style_df, index=rows[j], column=result_column_name, hex_color=color) | |
break | |
# Create a dictionary to map column names to Excel letters | |
column_letter_map = {v: k for k, v in enumerate(df.columns, start=1)} | |
# Save to an Excel file and get the workbook | |
style_df = pd.DataFrame('', index=df.index, columns=df.columns) | |
output_file = file.replace(".csv", "_processed.xlsx") | |
process_column(df, style_df, HIGH_NAME, 'HIGH Result', high_price_threshold) | |
process_column(df, style_df, LOW_NAME, 'LOW Result', low_price_threshold) | |
# add an empty row before the new month | |
df[DATE_NAME] = df[DATE_NAME].dt.strftime('%d-%b-%Y') | |
# df['Last_Thursday'] = df['Last_Thursday'].dt.strftime('%d-%b-%Y') | |
styled_df = df.style.apply(lambda _: style_df, axis=None) | |
styled_df.to_excel(output_file, engine='openpyxl', index=False) | |
return output_file | |
# Gradio Interface | |
def gradio_interface(file): | |
return process_csv(file) | |
# Gradio app interface | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.File(label="Upload CSV File (.csv)", file_count="single"), | |
outputs=gr.File(label="Download Processed Excel File"), | |
title="CSV to Excel Processor with Cell Highlighting", | |
description="Upload a CSV file with stock data, and download a processed Excel file with highlighted cells." | |
) | |
if __name__ == "__main__": | |
iface.launch() | |