""" This module defines the following routines used by the 'ingest' step of the time series forecasting flow: - ``load_file_as_dataframe``: Defines customizable logic for parsing dataset formats that are not natively parsed by MLflow Recipes (i.e. formats other than Parquet, Delta, and Spark SQL). """ import pandas as pd import os import tempfile import click import mlflow import gdown import requests import zipfile @click.command( help="Downloads the dataset and saves it as an mlflow artifact " "called 'data-csv-dir'." ) @click.option("--url", default="https://drive.google.com/uc?id=1H8RHsrgYMd6VC23_OJqrN6o_mL78pWpx") def ingest_request(url) -> pd.DataFrame: """ Downloads data from the specified url. :param url: Url to the dataset file. :return: MLflow artifact containing the downloaded data in its raw form. """ with mlflow.start_run(): local_dir = tempfile.mkdtemp() local_filename = os.path.join(local_dir, "news-data.zip") print(f"Downloading {url} to {local_filename}") # r = requests.get(url, stream=True) # with open(local_filename, "wb") as f: # for chunk in r.iter_content(chunk_size=1024): # if chunk: # filter out keep-alive new chunks # f.write(chunk) gdown.download(url, local_filename, quiet=False) extracted_dir = os.path.join(local_dir) print(f"Extracting {local_filename} into {extracted_dir}") with zipfile.ZipFile(local_filename, "r") as zip_ref: zip_ref.extractall(local_dir) data_file = os.path.join(extracted_dir, "2week_news_data.csv") print(f"Uploading data: {data_file}") mlflow.log_artifact(data_file, "data-csv-dir") if __name__ == "__main__": ingest_request()