File size: 2,353 Bytes
79e1719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Trains an Alternating Least Squares (ALS) model for user/movie ratings.
The input is a Parquet ratings dataset (see etl_data.py), and we output
an mlflow artifact called 'als-model'.
"""
import click
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

import mlflow
import mlflow.spark


@click.command()
@click.option("--ratings-data")
@click.option("--split-prop", default=0.8, type=float)
@click.option("--max-iter", default=10, type=int)
@click.option("--reg-param", default=0.1, type=float)
@click.option("--rank", default=12, type=int)
@click.option("--cold-start-strategy", default="drop")
def train_als(ratings_data, split_prop, max_iter, reg_param, rank, cold_start_strategy):
    seed = 42

    with pyspark.sql.SparkSession.builder.getOrCreate() as spark:
        ratings_df = spark.read.parquet(ratings_data)
        (training_df, test_df) = ratings_df.randomSplit([split_prop, 1 - split_prop], seed=seed)
        training_df.cache()
        test_df.cache()

        mlflow.log_metric("training_nrows", training_df.count())
        mlflow.log_metric("test_nrows", test_df.count())

        print(f"Training: {training_df.count()}, test: {test_df.count()}")

        als = (
            ALS()
            .setUserCol("userId")
            .setItemCol("movieId")
            .setRatingCol("rating")
            .setPredictionCol("predictions")
            .setMaxIter(max_iter)
            .setSeed(seed)
            .setRegParam(reg_param)
            .setColdStartStrategy(cold_start_strategy)
            .setRank(rank)
        )

        als_model = Pipeline(stages=[als]).fit(training_df)

        reg_eval = RegressionEvaluator(
            predictionCol="predictions", labelCol="rating", metricName="mse"
        )

        predicted_test_dF = als_model.transform(test_df)

        test_mse = reg_eval.evaluate(predicted_test_dF)
        train_mse = reg_eval.evaluate(als_model.transform(training_df))

        print(f"The model had a MSE on the test set of {test_mse}")
        print(f"The model had a MSE on the (train) set of {train_mse}")
        mlflow.log_metric("test_mse", test_mse)
        mlflow.log_metric("train_mse", train_mse)
        mlflow.spark.log_model(als_model, "als-model")


if __name__ == "__main__":
    train_als()