""" Converts the raw CSV form to a Parquet form with just the columns we want """ import os import tempfile import click import pandas as pd import mlflow @click.command( help="Given a CSV file (see load_raw_data), transforms it into Parquet " "in an mlflow artifact called 'data-parquet-dir'" ) @click.option("--data-csv") @click.option( "--max-row-limit", default=10000, help="Limit the data size to run comfortably on a laptop." ) def ingest_convert(data_csv, max_row_limit): with mlflow.start_run(): tmpdir = tempfile.mkdtemp() data_parquet_dir = os.path.join(tmpdir, "data-parquet") print(f"Converting data CSV {data_csv} to Parquet {data_parquet_dir}") # data_csv = fr"{data_csv}" # print(data_csv) # dirName = data_csv.replace("file:///", "") # fn = [f for f in os.listdir(dirName)\ # if f.endswith('.csv') and os.path.isfile(os.path.join(dirName, f))][0] # data_csv_file = os.path.join(dirName, fn) data_df = pd.read_csv(data_csv) data_df.to_parquet(data_parquet_dir) # table = pa.csv.read_csv(data_csv) # pa.parquet.write_table(table, data_csv.replace('csv', 'parquet')) if max_row_limit != -1: data_df = data_df.iloc[:max_row_limit] # data_df.write.parquet(data_parquet_dir) print(f"Uploading Parquet data: {data_parquet_dir}") mlflow.log_artifacts(data_parquet_dir, "data-parquet-dir") if __name__ == "__main__": ingest_convert()