Spaces:
Sleeping
Sleeping
""" | |
This module defines the following routines used by the 'ingest' step of the time series forecasting flow: | |
- ``load_file_as_dataframe``: Defines customizable logic for parsing dataset formats that are not | |
natively parsed by MLflow Recipes (i.e. formats other than Parquet, Delta, and Spark SQL). | |
""" | |
import pandas as pd | |
import os | |
import tempfile | |
import click | |
import mlflow | |
import gdown | |
import requests | |
import zipfile | |
def ingest_request(url) -> pd.DataFrame: | |
""" | |
Downloads data from the specified url. | |
:param url: Url to the dataset file. | |
:return: MLflow artifact containing the downloaded data in its raw form. | |
""" | |
with mlflow.start_run(): | |
local_dir = tempfile.mkdtemp() | |
local_filename = os.path.join(local_dir, "news-data.zip") | |
print(f"Downloading {url} to {local_filename}") | |
# r = requests.get(url, stream=True) | |
# with open(local_filename, "wb") as f: | |
# for chunk in r.iter_content(chunk_size=1024): | |
# if chunk: # filter out keep-alive new chunks | |
# f.write(chunk) | |
gdown.download(url, local_filename, quiet=False) | |
extracted_dir = os.path.join(local_dir) | |
print(f"Extracting {local_filename} into {extracted_dir}") | |
with zipfile.ZipFile(local_filename, "r") as zip_ref: | |
zip_ref.extractall(local_dir) | |
data_file = os.path.join(extracted_dir, "2week_news_data.csv") | |
print(f"Uploading data: {data_file}") | |
mlflow.log_artifact(data_file, "data-csv-dir") | |
if __name__ == "__main__": | |
ingest_request() | |