import datetime | |
import pandas as pd | |
from xgboost import XGBRegressor | |
import hopsworks | |
import json | |
from functions import util | |
import os | |
# Set up | |
api_key = os.getenv('HOPSWORKS_API_KEY') | |
project_name = os.getenv('HOPSWORKS_PROJECT') | |
project = hopsworks.login(project=project_name, api_key_value=api_key) | |
fs = project.get_feature_store() | |
secrets = util.secrets_api( | |
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value | |
location = json.loads(location_str) | |
country=location['country'] | |
city=location['city'] | |
street=location['street'] | |
AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value | |
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value | |
location = json.loads(location_str) | |
today = - datetime.timedelta(0) | |
feature_view = fs.get_feature_view( | |
name='air_quality_fv', | |
version=1, | |
) | |
### Retreive model | |
mr = project.get_model_registry() | |
retrieved_model = mr.get_model( | |
name="air_quality_xgboost_model", | |
version=1, | |
) | |
saved_model_dir = | |
retrieved_xgboost_model = XGBRegressor() | |
retrieved_xgboost_model.load_model(saved_model_dir + "/model.json") | |
### Retrieve features | |
weather_fg = fs.get_feature_group( | |
name='weather', | |
version=1, | |
) | |
today_timestamp = pd.to_datetime(today) | |
batch_data = weather_fg.filter( >= today_timestamp ).read() | |
### Predict and upload | |
batch_data['predicted_pm25'] = retrieved_xgboost_model.predict( | |
batch_data[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']]) | |
batch_data['street'] = street | |
batch_data['city'] = city | |
batch_data['country'] = country | |
# Fill in the number of days before the date on which you made the forecast (base_date) | |
batch_data['days_before_forecast_day'] = range(1, len(batch_data)+1) | |
batch_data = batch_data.sort_values(by=['date']) | |
#batch_data['date'] = batch_data['date'].dt.tz_convert(None).astype('datetime64[ns]') | |
plt = util.plot_air_quality_forecast(city, street, batch_data, file_path="./img/pm25_forecast.png") | |
monitor_fg = fs.get_or_create_feature_group( | |
name='aq_predictions', | |
description='Air Quality prediction monitoring', | |
version=1, | |
primary_key=['city','street','date','days_before_forecast_day'], | |
event_time="date" | |
) | |
print(f"Batch data: {batch_data}") | |
monitor_fg.insert(batch_data, write_options={"wait_for_job": True}) | |
monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read() | |
# Hindcast monitoring | |
air_quality_fg = fs.get_feature_group( | |
name='air_quality', | |
version=1, | |
) | |
air_quality_df = | |
outcome_df = air_quality_df[['date', 'pm25']] | |
preds_df = monitoring_df[['date', 'predicted_pm25']] | |
hindcast_df = pd.merge(preds_df, outcome_df, on="date") | |
hindcast_df = hindcast_df.sort_values(by=['date']) | |
if len(hindcast_df) == 0: | |
hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model) | |
plt = util.plot_air_quality_forecast(city, street, hindcast_df, file_path="./img/pm25_hindcast_1day.png", hindcast=True) |