Spaces:
Sleeping
Sleeping
""" | |
Trains an Alternating Least Squares (ALS) model for user/movie ratings. | |
The input is a Parquet ratings dataset (see etl_data.py), and we output | |
an mlflow artifact called 'als-model'. | |
""" | |
import click | |
import pyspark | |
from pyspark.ml import Pipeline | |
from pyspark.ml.evaluation import RegressionEvaluator | |
from pyspark.ml.recommendation import ALS | |
import mlflow | |
import mlflow.spark | |
def train_als(ratings_data, split_prop, max_iter, reg_param, rank, cold_start_strategy): | |
seed = 42 | |
with pyspark.sql.SparkSession.builder.getOrCreate() as spark: | |
ratings_df = spark.read.parquet(ratings_data) | |
(training_df, test_df) = ratings_df.randomSplit([split_prop, 1 - split_prop], seed=seed) | |
training_df.cache() | |
test_df.cache() | |
mlflow.log_metric("training_nrows", training_df.count()) | |
mlflow.log_metric("test_nrows", test_df.count()) | |
print(f"Training: {training_df.count()}, test: {test_df.count()}") | |
als = ( | |
ALS() | |
.setUserCol("userId") | |
.setItemCol("movieId") | |
.setRatingCol("rating") | |
.setPredictionCol("predictions") | |
.setMaxIter(max_iter) | |
.setSeed(seed) | |
.setRegParam(reg_param) | |
.setColdStartStrategy(cold_start_strategy) | |
.setRank(rank) | |
) | |
als_model = Pipeline(stages=[als]).fit(training_df) | |
reg_eval = RegressionEvaluator( | |
predictionCol="predictions", labelCol="rating", metricName="mse" | |
) | |
predicted_test_dF = als_model.transform(test_df) | |
test_mse = reg_eval.evaluate(predicted_test_dF) | |
train_mse = reg_eval.evaluate(als_model.transform(training_df)) | |
print(f"The model had a MSE on the test set of {test_mse}") | |
print(f"The model had a MSE on the (train) set of {train_mse}") | |
mlflow.log_metric("test_mse", test_mse) | |
mlflow.log_metric("train_mse", train_mse) | |
mlflow.spark.log_model(als_model, "als-model") | |
if __name__ == "__main__": | |
train_als() |