import os import datetime import time import requests import pandas as pd import json from geopy.geocoders import Nominatim import matplotlib.pyplot as plt from matplotlib.patches import Patch from matplotlib.ticker import MultipleLocator import openmeteo_requests import requests_cache from retry_requests import retry import hopsworks import hsfs from pathlib import Path import sys print(sys.path) def get_historical_weather(city, start_date, end_date, latitude, longitude): # latitude, longitude = get_city_coordinates(city) # Setup the Open-Meteo API client with cache and retry on error cache_session = requests_cache.CachedSession('.cache', expire_after = -1) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo = openmeteo_requests.Client(session = retry_session) # Make sure all required weather variables are listed here # The order of variables in hourly or daily is important to assign them correctly below url = "https://archive-api.open-meteo.com/v1/archive" params = { "latitude": latitude, "longitude": longitude, "start_date": start_date, "end_date": end_date, "daily": ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"] } responses = openmeteo.weather_api(url, params=params) # Process first location. Add a for-loop for multiple locations or weather models response = responses[0] print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") print(f"Elevation {response.Elevation()} m asl") print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") # Process daily data. The order of variables needs to be the same as requested. daily = response.Daily() daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy() daily_precipitation_sum = daily.Variables(1).ValuesAsNumpy() daily_wind_speed_10m_max = daily.Variables(2).ValuesAsNumpy() daily_wind_direction_10m_dominant = daily.Variables(3).ValuesAsNumpy() daily_data = {"date": pd.date_range( start = pd.to_datetime(daily.Time(), unit = "s"), end = pd.to_datetime(daily.TimeEnd(), unit = "s"), freq = pd.Timedelta(seconds = daily.Interval()), inclusive = "left" )} daily_data["temperature_2m_mean"] = daily_temperature_2m_mean daily_data["precipitation_sum"] = daily_precipitation_sum daily_data["wind_speed_10m_max"] = daily_wind_speed_10m_max daily_data["wind_direction_10m_dominant"] = daily_wind_direction_10m_dominant daily_dataframe = pd.DataFrame(data = daily_data) daily_dataframe = daily_dataframe.dropna() daily_dataframe['city'] = city return daily_dataframe def get_hourly_weather_forecast(city, latitude, longitude): # latitude, longitude = get_city_coordinates(city) # Setup the Open-Meteo API client with cache and retry on error cache_session = requests_cache.CachedSession('.cache', expire_after = 3600) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo = openmeteo_requests.Client(session = retry_session) # Make sure all required weather variables are listed here # The order of variables in hourly or daily is important to assign them correctly below url = "https://api.open-meteo.com/v1/ecmwf" params = { "latitude": latitude, "longitude": longitude, "hourly": ["temperature_2m", "precipitation", "wind_speed_10m", "wind_direction_10m"] } responses = openmeteo.weather_api(url, params=params) # Process first location. Add a for-loop for multiple locations or weather models response = responses[0] print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") print(f"Elevation {response.Elevation()} m asl") print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") # Process hourly data. The order of variables needs to be the same as requested. hourly = response.Hourly() hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy() hourly_precipitation = hourly.Variables(1).ValuesAsNumpy() hourly_wind_speed_10m = hourly.Variables(2).ValuesAsNumpy() hourly_wind_direction_10m = hourly.Variables(3).ValuesAsNumpy() hourly_data = {"date": pd.date_range( start = pd.to_datetime(hourly.Time(), unit = "s"), end = pd.to_datetime(hourly.TimeEnd(), unit = "s"), freq = pd.Timedelta(seconds = hourly.Interval()), inclusive = "left" )} hourly_data["temperature_2m_mean"] = hourly_temperature_2m hourly_data["precipitation_sum"] = hourly_precipitation hourly_data["wind_speed_10m_max"] = hourly_wind_speed_10m hourly_data["wind_direction_10m_dominant"] = hourly_wind_direction_10m hourly_dataframe = pd.DataFrame(data = hourly_data) hourly_dataframe = hourly_dataframe.dropna() return hourly_dataframe def get_city_coordinates(city_name: str): """ Takes city name and returns its latitude and longitude (rounded to 2 digits after dot). """ # Initialize Nominatim API (for getting lat and long of the city) geolocator = Nominatim(user_agent="MyApp") city = geolocator.geocode(city_name) latitude = round(city.latitude, 2) longitude = round(city.longitude, 2) return latitude, longitude def trigger_request(url:str): response = requests.get(url) if response.status_code == 200: # Extract the JSON content from the response data = response.json() else: print("Failed to retrieve data. Status Code:", response.status_code) raise requests.exceptions.RequestException(response.status_code) return data def get_pm25(aqicn_url: str, country: str, city: str, street: str, day: datetime.date, AQI_API_KEY: str): """ Returns DataFrame with air quality (pm25) as dataframe """ # The API endpoint URL url = f"{aqicn_url}/?token={AQI_API_KEY}" # Make a GET request to fetch the data from the API data = trigger_request(url) # if we get 'Unknown station' response then retry with city in url if data['data'] == "Unknown station": url1 = f"https://api.waqi.info/feed/{country}/{street}/?token={AQI_API_KEY}" data = trigger_request(url1) if data['data'] == "Unknown station": url2 = f"https://api.waqi.info/feed/{country}/{city}/{street}/?token={AQI_API_KEY}" data = trigger_request(url2) # Check if the API response contains the data if data['status'] == 'ok': # Extract the air quality data aqi_data = data['data'] aq_today_df = pd.DataFrame() aq_today_df['pm25'] = [aqi_data['iaqi'].get('pm25', {}).get('v', None)] aq_today_df['pm25'] = aq_today_df['pm25'].astype('float32') aq_today_df['country'] = country aq_today_df['city'] = city aq_today_df['street'] = street aq_today_df['date'] = day aq_today_df['date'] = pd.to_datetime(aq_today_df['date']) aq_today_df['url'] = aqicn_url else: print("Error: There may be an incorrect URL for your Sensor or it is not contactable right now. The API response does not contain data. Error message:", data['data']) raise requests.exceptions.RequestException(data['data']) return aq_today_df def plot_air_quality_forecast(city: str, street: str, df: pd.DataFrame, file_path: str, hindcast=False): fig, ax = plt.subplots(figsize=(10, 6)) day = pd.to_datetime(df['date']).dt.date # Plot each column separately in matplotlib ax.plot(day, df['predicted_pm25'], label='Predicted PM2.5', color='red', linewidth=2, marker='o', markersize=5, markerfacecolor='blue') # Set the y-axis to a logarithmic scale ax.set_yscale('log') ax.set_yticks([0, 10, 25, 50, 100, 250, 500]) ax.get_yaxis().set_major_formatter(plt.ScalarFormatter()) ax.set_ylim(bottom=1) # Set the labels and title ax.set_xlabel('Date') ax.set_title(f"PM2.5 Predicted (Logarithmic Scale) for {city}, {street}") ax.set_ylabel('PM2.5') colors = ['green', 'yellow', 'orange', 'red', 'purple', 'darkred'] labels = ['Good', 'Moderate', 'Unhealthy for Some', 'Unhealthy', 'Very Unhealthy', 'Hazardous'] ranges = [(0, 49), (50, 99), (100, 149), (150, 199), (200, 299), (300, 500)] for color, (start, end) in zip(colors, ranges): ax.axhspan(start, end, color=color, alpha=0.3) # Add a legend for the different Air Quality Categories patches = [Patch(color=colors[i], label=f"{labels[i]}: {ranges[i][0]}-{ranges[i][1]}") for i in range(len(colors))] legend1 = ax.legend(handles=patches, loc='upper right', title="Air Quality Categories", fontsize='x-small') # Aim for ~10 annotated values on x-axis, will work for both forecasts ans hindcasts if len(df.index) > 11: every_x_tick = len(df.index) / 10 ax.xaxis.set_major_locator(MultipleLocator(every_x_tick)) plt.xticks(rotation=45) if hindcast == True: ax.plot(day, df['pm25'], label='Actual PM2.5', color='black', linewidth=2, marker='^', markersize=5, markerfacecolor='grey') legend2 = ax.legend(loc='upper left', fontsize='x-small') ax.add_artist(legend1) # Ensure everything is laid out neatly plt.tight_layout() # # Save the figure, overwriting any existing file with the same name plt.savefig(file_path) return plt def delete_feature_groups(fs, name): try: for fg in fs.get_feature_groups(name): fg.delete() print(f"Deleted {fg.name}/{fg.version}") except hsfs.client.exceptions.RestAPIError: print(f"No {name} feature group found") def delete_feature_views(fs, name): try: for fv in fs.get_feature_views(name): fv.delete() print(f"Deleted {fv.name}/{fv.version}") except hsfs.client.exceptions.RestAPIError: print(f"No {name} feature view found") def delete_models(mr, name): models = mr.get_models(name) if not models: print(f"No {name} model found") for model in models: model.delete() print(f"Deleted model {model.name}/{model.version}") def delete_secrets(proj, name): secrets = secrets_api(proj.name) try: secret = secrets.get_secret(name) secret.delete() print(f"Deleted secret {name}") except hopsworks.client.exceptions.RestAPIError: print(f"No {name} secret found") # WARNING - this will wipe out all your feature data and models def purge_project(proj): fs = proj.get_feature_store() mr = proj.get_model_registry() # Delete Feature Views before deleting the feature groups delete_feature_views(fs, "air_quality_fv") # Delete ALL Feature Groups delete_feature_groups(fs, "air_quality") delete_feature_groups(fs, "weather") delete_feature_groups(fs, "aq_predictions") # Delete all Models delete_models(mr, "air_quality_xgboost_model") delete_secrets(proj, "SENSOR_LOCATION_JSON") def secrets_api(proj): host = "c.app.hopsworks.ai" api_key = os.environ.get('HOPSWORKS_API_KEY') conn = hopsworks.connection(host=host, project=proj, api_key_value=api_key) return conn.get_secrets_api() def check_file_path(file_path): my_file = Path(file_path) if my_file.is_file() == False: print(f"Error. File not found at the path: {file_path} ") else: print(f"File successfully found at the path: {file_path}") def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model): features_df = weather_fg.read() features_df = features_df.sort_values(by=['date'], ascending=True) features_df = features_df.tail(10) features_df['predicted_pm25'] = model.predict(features_df[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']]) air_quality_df['date'] = pd.to_datetime(air_quality_df['date']) features_df['date'] = features_df['date'].dt.tz_convert(None).astype('datetime64[ns]') df = pd.merge(features_df, air_quality_df[['date','pm25','street','country']], on="date") df['days_before_forecast_day'] = 1 hindcast_df = df df = df.drop('pm25', axis=1) monitor_fg.insert(df, write_options={"wait_for_job": True}) return hindcast_df