#!/usr/bin/env python
# coding: utf-8
# # **Air Quality** - Part 04: Batch Inference
#
# ## 🗒️ This notebook is divided into the following sections:
#
# 1. Download model and batch inference data
# 2. Make predictions, generate PNG for forecast
# 3. Store predictions in a monitoring feature group adn generate PNG for hindcast
# ## 📝 Imports
# In[1]:
import datetime
import pandas as pd
from xgboost import XGBRegressor
import hopsworks
import json
from functions import util
import os
# In[2]:
today = datetime.datetime.now() - datetime.timedelta(0)
tomorrow = today + datetime.timedelta(days = 1)
today
# ## 📡 Connect to Hopsworks Feature Store
# In[3]:
# os.environ["HOPSWORKS_API_KEY"] = ""
project = hopsworks.login()
fs = project.get_feature_store()
secrets = util.secrets_api(project.name)
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
location = json.loads(location_str)
country=location['country']
city=location['city']
street=location['street']
# ## ⚙️ Feature View Retrieval
#
# In[4]:
feature_view = fs.get_feature_view(
name='air_quality_fv',
version=1,
)
# ## 🪝 Download the model from Model Registry
# In[5]:
mr = project.get_model_registry()
retrieved_model = mr.get_model(
name="air_quality_xgboost_model",
version=1,
)
# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()
# In[6]:
# Loading the XGBoost regressor model and label encoder from the saved model directory
# retrieved_xgboost_model = joblib.load(saved_model_dir + "/xgboost_regressor.pkl")
retrieved_xgboost_model = XGBRegressor()
retrieved_xgboost_model.load_model(saved_model_dir + "/model.json")
# Displaying the retrieved XGBoost regressor model
retrieved_xgboost_model
# In[7]:
# Access the feature names of the trained XGBoost model
feature_names = retrieved_xgboost_model.get_booster().feature_names
# Print the feature names
print("Feature names:", feature_names)
# ## ✨ Get Weather Forecast Features with Feature View
#
#
# In[8]:
weather_fg = fs.get_feature_group(
name='weather',
version=1,
)
today_timestamp = pd.to_datetime(today)
batch_data = weather_fg.filter(weather_fg.date >= today_timestamp ).read()
batch_data
# ### Get Mean air quality for past days
# In[9]:
air_quality_fg = fs.get_feature_group(
name='air_quality',
version=1,
)
selected_features = air_quality_fg.select_all() #(['pm25']).join(weather_fg.select_all(), on=['city'])
selected_features = selected_features.read()
# In[10]:
selected_features = selected_features.sort_values(by='date').reset_index(drop=True)
# In[11]:
past_air_q_list = selected_features[['date', 'pm25']][-3:]['pm25'].tolist()
# In[12]:
batch_data = batch_data.sort_values(by='date').reset_index(drop=True)
# In[13]:
batch_data['past_air_quality'] = None
# In[14]:
batch_data
# ### 🤖 Making the predictions
# In[15]:
# Initialize an empty list to store predictions
predictions = []
# Iterate through each row of the DataFrame
for index, row in batch_data.iterrows():
past_air_quality_mean = sum(past_air_q_list)/3
# Extract the feature values for prediction as a 1D array
features = row[['past_air_quality', 'temperature_2m_mean', 'precipitation_sum',
'wind_speed_10m_max', 'wind_direction_10m_dominant']].values
# Reshape features to a 2D array (required by XGBoost's predict method)
features = features.reshape(1, -1)
# Make a prediction for the row
prediction = retrieved_xgboost_model.predict(features)
# Append the prediction to the list
predictions.append(prediction[0])
past_air_q_list.append(prediction[0])
past_air_q_list = past_air_q_list[1:]
# print(past_air_q_list)
batch_data.loc[index,'past_air_quality'] = past_air_quality_mean
# Add the predictions as a new column in the DataFrame
batch_data['predicted_pm25'] = predictions
# Display the updated DataFrame
batch_data
# In[16]:
# batch_data['predicted_pm25'] = retrieved_xgboost_model.predict(
# batch_data[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']])
# batch_data
# In[17]:
batch_data.info()
# ### 🤖 Saving the predictions (for monitoring) to a Feature Group
# In[18]:
batch_data['street'] = street
batch_data['city'] = city
batch_data['country'] = country
# Fill in the number of days before the date on which you made the forecast (base_date)
batch_data['days_before_forecast_day'] = range(1, len(batch_data)+1)
batch_data = batch_data.sort_values(by=['date'])
batch_data['date'] = batch_data['date'].dt.tz_convert(None).astype('datetime64[ns]')
batch_data
# In[19]:
batch_data.info()
# ### Create Forecast Graph
# Draw a graph of the predictions with dates as a PNG and save it to the github repo
# Show it on github pages
# In[20]:
file_path = "img/pm25_forecast.png"
plt = util.plot_air_quality_forecast(city, street, batch_data, file_path)
plt.show()
# In[21]:
# Get or create feature group
monitor_fg = fs.get_or_create_feature_group(
name='aq_predictions',
description='Air Quality prediction monitoring',
version=1,
primary_key=['city','street','date','days_before_forecast_day'],
event_time="date"
)
# In[22]:
monitor_fg.insert(batch_data, write_options={"wait_for_job": True})
# In[23]:
# We will create a hindcast chart for only the forecasts made 1 day beforehand
monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read()
monitoring_df
# In[24]:
air_quality_fg = fs.get_feature_group(
name='air_quality',
version=1,
)
air_quality_df = air_quality_fg.read()
air_quality_df
# In[25]:
air_quality_df['date']
# In[26]:
monitoring_df['date']
# In[27]:
air_quality_df['date'] = pd.to_datetime(air_quality_df['date'])
monitoring_df['date'] = monitoring_df['date'].dt.tz_convert(None).astype('datetime64[ns]')
# In[28]:
weather_fg.read()
# In[29]:
air_quality_df
# In[30]:
monitor_fg.read()
# In[31]:
outcome_df = air_quality_df[['date', 'pm25']]
preds_df = monitoring_df[['date', 'predicted_pm25']]
hindcast_df = pd.merge(preds_df, outcome_df, on="date")
hindcast_df = hindcast_df.sort_values(by=['date'])
# If there are no outcomes for predictions yet, generate some predictions/outcomes from existing data
if len(hindcast_df) == 0:
hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model)
hindcast_df
# ### Plot the Hindcast comparing predicted with forecasted values (1-day prior forecast)
#
# __This graph will be empty to begin with - this is normal.__
#
# After a few days of predictions and observations, you will get data points in this graph.
# In[32]:
file_path = "img/pm25_hindcast_1day.png"
plt = util.plot_air_quality_forecast(city, street, hindcast_df, file_path, hindcast=True)
plt.show()
# %%