Spaces:
Sleeping
Sleeping
""" | |
Converts the raw CSV form to a Parquet form with just the columns we want | |
""" | |
import os | |
import tempfile | |
import click | |
import pandas as pd | |
import mlflow | |
def ingest_convert(data_csv, max_row_limit): | |
with mlflow.start_run(): | |
tmpdir = tempfile.mkdtemp() | |
data_parquet_dir = os.path.join(tmpdir, "data-parquet") | |
print(f"Converting data CSV {data_csv} to Parquet {data_parquet_dir}") | |
# data_csv = fr"{data_csv}" | |
# print(data_csv) | |
# dirName = data_csv.replace("file:///", "") | |
# fn = [f for f in os.listdir(dirName)\ | |
# if f.endswith('.csv') and os.path.isfile(os.path.join(dirName, f))][0] | |
# data_csv_file = os.path.join(dirName, fn) | |
data_df = pd.read_csv(data_csv) | |
data_df.to_parquet(data_parquet_dir) | |
# table = pa.csv.read_csv(data_csv) | |
# pa.parquet.write_table(table, data_csv.replace('csv', 'parquet')) | |
if max_row_limit != -1: | |
data_df = data_df.iloc[:max_row_limit] | |
# data_df.write.parquet(data_parquet_dir) | |
print(f"Uploading Parquet data: {data_parquet_dir}") | |
mlflow.log_artifacts(data_parquet_dir, "data-parquet-dir") | |
if __name__ == "__main__": | |
ingest_convert() |