Spaces:
Runtime error
Runtime error
# {% include 'templates/license_header' %} | |
import os | |
from typing import Optional | |
import click | |
from pipelines import ( | |
feature_engineering, | |
inference, | |
breast_cancer_training, | |
breast_cancer_deployment_pipeline | |
) | |
from zenml.client import Client | |
from zenml.logger import get_logger | |
logger = get_logger(__name__) | |
def main( | |
train_dataset_name: str = "dataset_trn", | |
train_dataset_version_name: Optional[str] = None, | |
test_dataset_name: str = "dataset_tst", | |
test_dataset_version_name: Optional[str] = None, | |
config: Optional[str] = None, | |
feature_pipeline: bool = False, | |
training_pipeline: bool = False, | |
inference_pipeline: bool = False, | |
deployment_pipeline: bool = False, | |
): | |
"""Main entry point for the pipeline execution. | |
This entrypoint is where everything comes together: | |
* configuring pipeline with the required parameters | |
(some of which may come from command line arguments, but most | |
of which comes from the YAML config files) | |
* launching the pipeline | |
""" | |
config_folder = os.path.join( | |
os.path.dirname(os.path.realpath(__file__)), | |
"configs", | |
) | |
client = Client() | |
# Execute Feature Engineering Pipeline | |
if feature_pipeline: | |
pipeline_args = {} | |
pipeline_args["config_path"] = os.path.join( | |
config_folder, "feature_engineering.yaml" | |
) | |
run_args_feature = {} | |
feature_engineering.with_options(**pipeline_args)(**run_args_feature) | |
logger.info("Feature Engineering pipeline finished successfully!") | |
train_dataset_artifact = client.get_artifact_version( | |
train_dataset_name | |
) | |
test_dataset_artifact = client.get_artifact_version(test_dataset_name) | |
logger.info( | |
"The latest feature engineering pipeline produced the following " | |
f"artifacts: \n\n1. Train Dataset - Name: {train_dataset_name}, " | |
f"Version Name: {train_dataset_artifact.version} \n2. Test Dataset: " | |
f"Name: {test_dataset_name}, Version Name: {test_dataset_artifact.version}" | |
) | |
# Execute Training Pipeline | |
if training_pipeline: | |
pipeline_args = {} | |
if config is None: | |
pipeline_args["config_path"] = os.path.join(config_folder, "training.yaml") | |
else: | |
pipeline_args["config_path"] = os.path.join(config_folder, config) | |
run_args_train = {} | |
# If train_dataset_version_name is specified, use versioned artifacts | |
if train_dataset_version_name or test_dataset_version_name: | |
# However, both train and test dataset versions must be specified | |
assert ( | |
train_dataset_version_name is not None | |
and test_dataset_version_name is not None | |
) | |
train_dataset_artifact = client.get_artifact_version( | |
train_dataset_name, train_dataset_version_name | |
) | |
# If train dataset is specified, test dataset must be specified | |
test_dataset_artifact = client.get_artifact_version( | |
test_dataset_name, test_dataset_version_name | |
) | |
# Use versioned artifacts | |
run_args_train["train_dataset_id"] = train_dataset_artifact.id | |
run_args_train["test_dataset_id"] = test_dataset_artifact.id | |
from zenml.config import DockerSettings | |
# The actual code will stay the same, all that needs to be done is some configuration | |
step_args = {} | |
# We configure which step operator should be used | |
# M5 Large is what we need for this big data! | |
step_args["settings"] = {"step_operator.sagemaker": {"estimator_args": {"instance_type" : "ml.m5.large"}}} | |
# Update the step. We could also do this in YAML | |
model_trainer = model_trainer.with_options(**step_args) | |
docker_settings = DockerSettings( | |
requirements=[ | |
"pyarrow", | |
"scikit-learn==1.1.1" | |
], | |
) | |
pipeline_args = { | |
"enable_cache": True, | |
"settings": {"docker": docker_settings} | |
} | |
breast_cancer_training.with_options(**pipeline_args)(**run_args_train) | |
logger.info("Training pipeline finished successfully!") | |
if inference_pipeline: | |
pipeline_args = {} | |
if config is None: | |
pipeline_args["config_path"] = os.path.join(config_folder, "inference.yaml") | |
else: | |
pipeline_args["config_path"] = os.path.join(config_folder, config) | |
run_args_inference = {} | |
inference.with_options(**pipeline_args)(**run_args_inference) | |
logger.info("Inference pipeline finished successfully!") | |
if deployment_pipeline: | |
pipeline_args = {} | |
pipeline_args["config_path"] = os.path.join(config_folder, "deployment.yaml") | |
run_args_inference = {} | |
breast_cancer_deployment_pipeline.with_options(**pipeline_args)(**run_args_inference) | |
logger.info("Deployment pipeline finished successfully!") | |
if __name__ == "__main__": | |
main() | |