| | import os |
| | import uuid |
| | import shutil |
| | import pandas as pd |
| | import polars as pl |
| | import time |
| | import logging |
| | from typing import Optional, Tuple |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | def validate_csv_structure(df: pd.DataFrame) -> Tuple[bool, str]: |
| | """ |
| | Validate the structure of the DataFrame. |
| | |
| | Args: |
| | df: DataFrame to validate |
| | |
| | Returns: |
| | Tuple[bool, str]: (is_valid, error_message) |
| | """ |
| | |
| | if df.empty: |
| | return False, "CSV file is empty" |
| | |
| | |
| | required_columns = ['_id', 'text'] |
| | missing_columns = [col for col in required_columns if col not in df.columns] |
| | if missing_columns: |
| | return False, f"Missing required columns: {', '.join(missing_columns)}" |
| | |
| | |
| | if df['_id'].isna().any(): |
| | return False, "Found empty _id values" |
| | |
| | |
| | if df['text'].isna().any(): |
| | return False, "Found empty text values" |
| | |
| | |
| | if df['_id'].duplicated().any(): |
| | return False, "Found duplicate _id values" |
| | |
| | return True, "" |
| |
|
| | def getDataFrame(path: str) -> Optional[pl.DataFrame]: |
| | """ |
| | Read and validate CSV file into a DataFrame. |
| | |
| | Args: |
| | path: Path to the CSV file |
| | |
| | Returns: |
| | Optional[pl.DataFrame]: The validated DataFrame or None if validation fails |
| | """ |
| | try: |
| | |
| | data = pd.read_csv( |
| | path, |
| | sep="\t", |
| | header=0, |
| | on_bad_lines='warn', |
| | encoding='utf-8' |
| | ) |
| | |
| | |
| | is_valid, error_message = validate_csv_structure(data) |
| | if not is_valid: |
| | logger.error(error_message) |
| | return None |
| | |
| | |
| | data['text'] = data['text'].astype(str).str.strip() |
| | data = data[data['text'].str.len() > 0] |
| | |
| | if data.empty: |
| | logger.error("No valid text data found after cleaning") |
| | return None |
| | |
| | |
| | pl_df = pl.from_pandas(data) |
| | logger.info(f"Successfully loaded {len(pl_df)} rows from CSV") |
| | |
| | return pl_df |
| | |
| | except pd.errors.EmptyDataError: |
| | logger.error("CSV file is empty") |
| | return None |
| | except pd.errors.ParserError as e: |
| | logger.error(f"Error parsing CSV file: {str(e)}") |
| | return None |
| | except Exception as e: |
| | logger.error(f"Unexpected error reading CSV: {str(e)}") |
| | return None |
| |
|
| | def save_to_csv(dataframe: pl.DataFrame) -> Optional[str]: |
| | """ |
| | Save DataFrame to CSV file. |
| | |
| | Args: |
| | dataframe: Polars DataFrame to save |
| | |
| | Returns: |
| | Optional[str]: Path to saved file or None if save fails |
| | """ |
| | try: |
| | if dataframe is None or dataframe.is_empty(): |
| | logger.warning("No data to save") |
| | return None |
| | |
| | |
| | folder_path = "data" |
| | os.makedirs(folder_path, exist_ok=True) |
| | |
| | |
| | timestamp = int(time.time()) |
| | csv_path = f"{folder_path}/results_{timestamp}.csv" |
| | |
| | |
| | dataframe.write_csv(csv_path, separator="\t") |
| | logger.info(f"Results saved to {csv_path}") |
| | |
| | return csv_path |
| | |
| | except Exception as e: |
| | logger.error(f"Error saving results: {str(e)}") |
| | return None |
| |
|
| | def delete_folder_periodically(path: str, interval: int = 3600) -> None: |
| | """ |
| | Periodically clean up the data folder. |
| | |
| | Args: |
| | path: Path to folder to clean |
| | interval: Interval between cleanups in seconds |
| | """ |
| | while True: |
| | try: |
| | if os.path.exists(path): |
| | |
| | current_time = time.time() |
| | |
| | |
| | for filename in os.listdir(path): |
| | file_path = os.path.join(path, filename) |
| | if os.path.isfile(file_path): |
| | |
| | file_age = current_time - os.path.getmtime(file_path) |
| | if file_age > interval: |
| | os.remove(file_path) |
| | logger.info(f"Deleted old file: {file_path}") |
| | |
| | time.sleep(interval) |
| | |
| | except Exception as e: |
| | logger.error(f"Error in cleanup task: {str(e)}") |
| | time.sleep(interval) |