Spaces:
Sleeping
Sleeping
""" | |
Trains a Keras model for user/movie ratings. The input is a Parquet | |
ratings dataset (see etl_data.py) and an ALS model (see als.py), which we | |
will use to supplement our input and train using. | |
""" | |
from itertools import chain | |
import click | |
import numpy as np | |
import pandas as pd | |
import pyspark | |
import tensorflow as tf | |
from pyspark.sql.functions import col, udf | |
from pyspark.sql.types import ArrayType, FloatType | |
from tensorflow import keras | |
from tensorflow.keras.callbacks import EarlyStopping | |
from tensorflow.keras.layers import Dense | |
from tensorflow.keras.models import Sequential | |
import mlflow | |
import mlflow.spark | |
def train_keras(ratings_data, als_model_uri, hidden_units): | |
np.random.seed(0) | |
tf.set_random_seed(42) # For reproducibility | |
with pyspark.sql.SparkSession.builder.getOrCreate() as spark: | |
als_model = mlflow.spark.load_model(als_model_uri).stages[0] | |
ratings_df = spark.read.parquet(ratings_data) | |
(training_df, test_df) = ratings_df.randomSplit([0.8, 0.2], seed=42) | |
training_df.cache() | |
test_df.cache() | |
mlflow.log_metric("training_nrows", training_df.count()) | |
mlflow.log_metric("test_nrows", test_df.count()) | |
print(f"Training: {training_df.count()}, test: {test_df.count()}") | |
user_factors = als_model.userFactors.selectExpr("id as userId", "features as uFeatures") | |
item_factors = als_model.itemFactors.selectExpr("id as movieId", "features as iFeatures") | |
joined_train_df = training_df.join(item_factors, on="movieId").join( | |
user_factors, on="userId" | |
) | |
joined_test_df = test_df.join(item_factors, on="movieId").join(user_factors, on="userId") | |
# We'll combine the movies and ratings vectors into a single vector of length 24. | |
# We will then explode this features vector into a set of columns. | |
def concat_arrays(*args): | |
return list(chain(*args)) | |
concat_arrays_udf = udf(concat_arrays, ArrayType(FloatType())) | |
concat_train_df = joined_train_df.select( | |
"userId", | |
"movieId", | |
concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), | |
col("rating").cast("float"), | |
) | |
concat_test_df = joined_test_df.select( | |
"userId", | |
"movieId", | |
concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), | |
col("rating").cast("float"), | |
) | |
pandas_df = concat_train_df.toPandas() | |
pandas_test_df = concat_test_df.toPandas() | |
# This syntax will create a new DataFrame where elements of the 'features' vector | |
# are each in their own column. This is what we'll train our neural network on. | |
x_test = pd.DataFrame(pandas_test_df.features.values.tolist(), index=pandas_test_df.index) | |
x_train = pd.DataFrame(pandas_df.features.values.tolist(), index=pandas_df.index) | |
# Show matrix for example. | |
print("Training matrix:") | |
print(x_train) | |
# Create our Keras model with two fully connected hidden layers. | |
model = Sequential() | |
model.add(Dense(30, input_dim=24, activation="relu")) | |
model.add(Dense(hidden_units, activation="relu")) | |
model.add(Dense(1, activation="linear")) | |
model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.0001)) | |
early_stopping = EarlyStopping( | |
monitor="val_loss", min_delta=0.0001, patience=2, mode="auto" | |
) | |
model.fit( | |
x_train, | |
pandas_df["rating"], | |
validation_split=0.2, | |
verbose=2, | |
epochs=3, | |
batch_size=128, | |
shuffle=False, | |
callbacks=[early_stopping], | |
) | |
train_mse = model.evaluate(x_train, pandas_df["rating"], verbose=2) | |
test_mse = model.evaluate(x_test, pandas_test_df["rating"], verbose=2) | |
mlflow.log_metric("test_mse", test_mse) | |
mlflow.log_metric("train_mse", train_mse) | |
print(f"The model had a MSE on the test set of {test_mse}") | |
mlflow.tensorflow.log_model(model, "keras-model") | |
if __name__ == "__main__": | |
train_keras() |