File size: 4,429 Bytes
79e1719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
Trains a Keras model for user/movie ratings. The input is a Parquet
ratings dataset (see etl_data.py) and an ALS model (see als.py), which we
will use to supplement our input and train using.
"""
from itertools import chain

import click
import numpy as np
import pandas as pd
import pyspark
import tensorflow as tf
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, FloatType
from tensorflow import keras
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

import mlflow
import mlflow.spark


@click.command()
@click.option("--ratings-data", help="Path readable by Spark to the ratings Parquet file")
@click.option("--als-model-uri", help="Path readable by load_model to ALS MLmodel")
@click.option("--hidden-units", default=20, type=int)
def train_keras(ratings_data, als_model_uri, hidden_units):
    np.random.seed(0)
    tf.set_random_seed(42)  # For reproducibility

    with pyspark.sql.SparkSession.builder.getOrCreate() as spark:
        als_model = mlflow.spark.load_model(als_model_uri).stages[0]
        ratings_df = spark.read.parquet(ratings_data)
        (training_df, test_df) = ratings_df.randomSplit([0.8, 0.2], seed=42)
        training_df.cache()
        test_df.cache()

        mlflow.log_metric("training_nrows", training_df.count())
        mlflow.log_metric("test_nrows", test_df.count())

        print(f"Training: {training_df.count()}, test: {test_df.count()}")

        user_factors = als_model.userFactors.selectExpr("id as userId", "features as uFeatures")
        item_factors = als_model.itemFactors.selectExpr("id as movieId", "features as iFeatures")
        joined_train_df = training_df.join(item_factors, on="movieId").join(
            user_factors, on="userId"
        )
        joined_test_df = test_df.join(item_factors, on="movieId").join(user_factors, on="userId")

        # We'll combine the movies and ratings vectors into a single vector of length 24.
        # We will then explode this features vector into a set of columns.
        def concat_arrays(*args):
            return list(chain(*args))

        concat_arrays_udf = udf(concat_arrays, ArrayType(FloatType()))

        concat_train_df = joined_train_df.select(
            "userId",
            "movieId",
            concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"),
            col("rating").cast("float"),
        )
        concat_test_df = joined_test_df.select(
            "userId",
            "movieId",
            concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"),
            col("rating").cast("float"),
        )

        pandas_df = concat_train_df.toPandas()
        pandas_test_df = concat_test_df.toPandas()

        # This syntax will create a new DataFrame where elements of the 'features' vector
        # are each in their own column. This is what we'll train our neural network on.
        x_test = pd.DataFrame(pandas_test_df.features.values.tolist(), index=pandas_test_df.index)
        x_train = pd.DataFrame(pandas_df.features.values.tolist(), index=pandas_df.index)

        # Show matrix for example.
        print("Training matrix:")
        print(x_train)

        # Create our Keras model with two fully connected hidden layers.
        model = Sequential()
        model.add(Dense(30, input_dim=24, activation="relu"))
        model.add(Dense(hidden_units, activation="relu"))
        model.add(Dense(1, activation="linear"))

        model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.0001))

        early_stopping = EarlyStopping(
            monitor="val_loss", min_delta=0.0001, patience=2, mode="auto"
        )

        model.fit(
            x_train,
            pandas_df["rating"],
            validation_split=0.2,
            verbose=2,
            epochs=3,
            batch_size=128,
            shuffle=False,
            callbacks=[early_stopping],
        )

        train_mse = model.evaluate(x_train, pandas_df["rating"], verbose=2)
        test_mse = model.evaluate(x_test, pandas_test_df["rating"], verbose=2)
        mlflow.log_metric("test_mse", test_mse)
        mlflow.log_metric("train_mse", train_mse)

        print(f"The model had a MSE on the test set of {test_mse}")
        mlflow.tensorflow.log_model(model, "keras-model")


if __name__ == "__main__":
    train_keras()