poc / experimentation_mlops /mlops /ingest_convert.py
ryanrahmadifa
Added files
79e1719
raw
history blame
1.53 kB
"""
Converts the raw CSV form to a Parquet form with just the columns we want
"""
import os
import tempfile
import click
import pandas as pd
import mlflow
@click.command(
help="Given a CSV file (see load_raw_data), transforms it into Parquet "
"in an mlflow artifact called 'data-parquet-dir'"
)
@click.option("--data-csv")
@click.option(
"--max-row-limit", default=10000, help="Limit the data size to run comfortably on a laptop."
)
def ingest_convert(data_csv, max_row_limit):
with mlflow.start_run():
tmpdir = tempfile.mkdtemp()
data_parquet_dir = os.path.join(tmpdir, "data-parquet")
print(f"Converting data CSV {data_csv} to Parquet {data_parquet_dir}")
# data_csv = fr"{data_csv}"
# print(data_csv)
# dirName = data_csv.replace("file:///", "")
# fn = [f for f in os.listdir(dirName)\
# if f.endswith('.csv') and os.path.isfile(os.path.join(dirName, f))][0]
# data_csv_file = os.path.join(dirName, fn)
data_df = pd.read_csv(data_csv)
data_df.to_parquet(data_parquet_dir)
# table = pa.csv.read_csv(data_csv)
# pa.parquet.write_table(table, data_csv.replace('csv', 'parquet'))
if max_row_limit != -1:
data_df = data_df.iloc[:max_row_limit]
# data_df.write.parquet(data_parquet_dir)
print(f"Uploading Parquet data: {data_parquet_dir}")
mlflow.log_artifacts(data_parquet_dir, "data-parquet-dir")
if __name__ == "__main__":
ingest_convert()