Spaces:
Running
Running
import os | |
import datetime | |
import time | |
import requests | |
import pandas as pd | |
import json | |
from geopy.geocoders import Nominatim | |
import matplotlib.pyplot as plt | |
from matplotlib.patches import Patch | |
from matplotlib.ticker import MultipleLocator | |
import openmeteo_requests | |
import requests_cache | |
from retry_requests import retry | |
import hopsworks | |
import hsfs | |
from pathlib import Path | |
import sys | |
print(sys.path) | |
def get_historical_weather(city, start_date, end_date, latitude, longitude): | |
# latitude, longitude = get_city_coordinates(city) | |
# Setup the Open-Meteo API client with cache and retry on error | |
cache_session = requests_cache.CachedSession('.cache', expire_after = -1) | |
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) | |
openmeteo = openmeteo_requests.Client(session = retry_session) | |
# Make sure all required weather variables are listed here | |
# The order of variables in hourly or daily is important to assign them correctly below | |
url = "https://archive-api.open-meteo.com/v1/archive" | |
params = { | |
"latitude": latitude, | |
"longitude": longitude, | |
"start_date": start_date, | |
"end_date": end_date, | |
"daily": ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"] | |
} | |
responses = openmeteo.weather_api(url, params=params) | |
# Process first location. Add a for-loop for multiple locations or weather models | |
response = responses[0] | |
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") | |
print(f"Elevation {response.Elevation()} m asl") | |
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") | |
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") | |
# Process daily data. The order of variables needs to be the same as requested. | |
daily = response.Daily() | |
daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy() | |
daily_precipitation_sum = daily.Variables(1).ValuesAsNumpy() | |
daily_wind_speed_10m_max = daily.Variables(2).ValuesAsNumpy() | |
daily_wind_direction_10m_dominant = daily.Variables(3).ValuesAsNumpy() | |
daily_data = {"date": pd.date_range( | |
start = pd.to_datetime(daily.Time(), unit = "s"), | |
end = pd.to_datetime(daily.TimeEnd(), unit = "s"), | |
freq = pd.Timedelta(seconds = daily.Interval()), | |
inclusive = "left" | |
)} | |
daily_data["temperature_2m_mean"] = daily_temperature_2m_mean | |
daily_data["precipitation_sum"] = daily_precipitation_sum | |
daily_data["wind_speed_10m_max"] = daily_wind_speed_10m_max | |
daily_data["wind_direction_10m_dominant"] = daily_wind_direction_10m_dominant | |
daily_dataframe = pd.DataFrame(data = daily_data) | |
daily_dataframe = daily_dataframe.dropna() | |
daily_dataframe['city'] = city | |
return daily_dataframe | |
def get_hourly_weather_forecast(city, latitude, longitude): | |
# latitude, longitude = get_city_coordinates(city) | |
# Setup the Open-Meteo API client with cache and retry on error | |
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600) | |
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) | |
openmeteo = openmeteo_requests.Client(session = retry_session) | |
# Make sure all required weather variables are listed here | |
# The order of variables in hourly or daily is important to assign them correctly below | |
url = "https://api.open-meteo.com/v1/ecmwf" | |
params = { | |
"latitude": latitude, | |
"longitude": longitude, | |
"hourly": ["temperature_2m", "precipitation", "wind_speed_10m", "wind_direction_10m"] | |
} | |
responses = openmeteo.weather_api(url, params=params) | |
# Process first location. Add a for-loop for multiple locations or weather models | |
response = responses[0] | |
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") | |
print(f"Elevation {response.Elevation()} m asl") | |
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") | |
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") | |
# Process hourly data. The order of variables needs to be the same as requested. | |
hourly = response.Hourly() | |
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy() | |
hourly_precipitation = hourly.Variables(1).ValuesAsNumpy() | |
hourly_wind_speed_10m = hourly.Variables(2).ValuesAsNumpy() | |
hourly_wind_direction_10m = hourly.Variables(3).ValuesAsNumpy() | |
hourly_data = {"date": pd.date_range( | |
start = pd.to_datetime(hourly.Time(), unit = "s"), | |
end = pd.to_datetime(hourly.TimeEnd(), unit = "s"), | |
freq = pd.Timedelta(seconds = hourly.Interval()), | |
inclusive = "left" | |
)} | |
hourly_data["temperature_2m_mean"] = hourly_temperature_2m | |
hourly_data["precipitation_sum"] = hourly_precipitation | |
hourly_data["wind_speed_10m_max"] = hourly_wind_speed_10m | |
hourly_data["wind_direction_10m_dominant"] = hourly_wind_direction_10m | |
hourly_dataframe = pd.DataFrame(data = hourly_data) | |
hourly_dataframe = hourly_dataframe.dropna() | |
return hourly_dataframe | |
def get_city_coordinates(city_name: str): | |
""" | |
Takes city name and returns its latitude and longitude (rounded to 2 digits after dot). | |
""" | |
# Initialize Nominatim API (for getting lat and long of the city) | |
geolocator = Nominatim(user_agent="MyApp") | |
city = geolocator.geocode(city_name) | |
latitude = round(city.latitude, 2) | |
longitude = round(city.longitude, 2) | |
return latitude, longitude | |
def trigger_request(url:str): | |
response = requests.get(url) | |
if response.status_code == 200: | |
# Extract the JSON content from the response | |
data = response.json() | |
else: | |
print("Failed to retrieve data. Status Code:", response.status_code) | |
raise requests.exceptions.RequestException(response.status_code) | |
return data | |
def get_pm25(aqicn_url: str, country: str, city: str, street: str, day: datetime.date, AQI_API_KEY: str): | |
""" | |
Returns DataFrame with air quality (pm25) as dataframe | |
""" | |
# The API endpoint URL | |
url = f"{aqicn_url}/?token={AQI_API_KEY}" | |
# Make a GET request to fetch the data from the API | |
data = trigger_request(url) | |
# if we get 'Unknown station' response then retry with city in url | |
if data['data'] == "Unknown station": | |
url1 = f"https://api.waqi.info/feed/{country}/{street}/?token={AQI_API_KEY}" | |
data = trigger_request(url1) | |
if data['data'] == "Unknown station": | |
url2 = f"https://api.waqi.info/feed/{country}/{city}/{street}/?token={AQI_API_KEY}" | |
data = trigger_request(url2) | |
# Check if the API response contains the data | |
if data['status'] == 'ok': | |
# Extract the air quality data | |
aqi_data = data['data'] | |
aq_today_df = pd.DataFrame() | |
aq_today_df['pm25'] = [aqi_data['iaqi'].get('pm25', {}).get('v', None)] | |
aq_today_df['pm25'] = aq_today_df['pm25'].astype('float32') | |
aq_today_df['country'] = country | |
aq_today_df['city'] = city | |
aq_today_df['street'] = street | |
aq_today_df['date'] = day | |
aq_today_df['date'] = pd.to_datetime(aq_today_df['date']) | |
aq_today_df['url'] = aqicn_url | |
else: | |
print("Error: There may be an incorrect URL for your Sensor or it is not contactable right now. The API response does not contain data. Error message:", data['data']) | |
raise requests.exceptions.RequestException(data['data']) | |
return aq_today_df | |
def plot_air_quality_forecast(city: str, street: str, df: pd.DataFrame, file_path: str, hindcast=False): | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
day = pd.to_datetime(df['date']).dt.date | |
# Plot each column separately in matplotlib | |
ax.plot(day, df['predicted_pm25'], label='Predicted PM2.5', color='red', linewidth=2, marker='o', markersize=5, markerfacecolor='blue') | |
# Set the y-axis to a logarithmic scale | |
ax.set_yscale('log') | |
ax.set_yticks([0, 10, 25, 50, 100, 250, 500]) | |
ax.get_yaxis().set_major_formatter(plt.ScalarFormatter()) | |
ax.set_ylim(bottom=1) | |
# Set the labels and title | |
ax.set_xlabel('Date') | |
ax.set_title(f"PM2.5 Predicted (Logarithmic Scale) for {city}, {street}") | |
ax.set_ylabel('PM2.5') | |
colors = ['green', 'yellow', 'orange', 'red', 'purple', 'darkred'] | |
labels = ['Good', 'Moderate', 'Unhealthy for Some', 'Unhealthy', 'Very Unhealthy', 'Hazardous'] | |
ranges = [(0, 49), (50, 99), (100, 149), (150, 199), (200, 299), (300, 500)] | |
for color, (start, end) in zip(colors, ranges): | |
ax.axhspan(start, end, color=color, alpha=0.3) | |
# Add a legend for the different Air Quality Categories | |
patches = [Patch(color=colors[i], label=f"{labels[i]}: {ranges[i][0]}-{ranges[i][1]}") for i in range(len(colors))] | |
legend1 = ax.legend(handles=patches, loc='upper right', title="Air Quality Categories", fontsize='x-small') | |
# Aim for ~10 annotated values on x-axis, will work for both forecasts ans hindcasts | |
if len(df.index) > 11: | |
every_x_tick = len(df.index) / 10 | |
ax.xaxis.set_major_locator(MultipleLocator(every_x_tick)) | |
plt.xticks(rotation=45) | |
if hindcast == True: | |
ax.plot(day, df['pm25'], label='Actual PM2.5', color='black', linewidth=2, marker='^', markersize=5, markerfacecolor='grey') | |
legend2 = ax.legend(loc='upper left', fontsize='x-small') | |
ax.add_artist(legend1) | |
# Ensure everything is laid out neatly | |
plt.tight_layout() | |
# # Save the figure, overwriting any existing file with the same name | |
plt.savefig(file_path) | |
return plt | |
def delete_feature_groups(fs, name): | |
try: | |
for fg in fs.get_feature_groups(name): | |
fg.delete() | |
print(f"Deleted {fg.name}/{fg.version}") | |
except hsfs.client.exceptions.RestAPIError: | |
print(f"No {name} feature group found") | |
def delete_feature_views(fs, name): | |
try: | |
for fv in fs.get_feature_views(name): | |
fv.delete() | |
print(f"Deleted {fv.name}/{fv.version}") | |
except hsfs.client.exceptions.RestAPIError: | |
print(f"No {name} feature view found") | |
def delete_models(mr, name): | |
models = mr.get_models(name) | |
if not models: | |
print(f"No {name} model found") | |
for model in models: | |
model.delete() | |
print(f"Deleted model {model.name}/{model.version}") | |
def delete_secrets(proj, name): | |
secrets = secrets_api(proj.name) | |
try: | |
secret = secrets.get_secret(name) | |
secret.delete() | |
print(f"Deleted secret {name}") | |
except hopsworks.client.exceptions.RestAPIError: | |
print(f"No {name} secret found") | |
# WARNING - this will wipe out all your feature data and models | |
def purge_project(proj): | |
fs = proj.get_feature_store() | |
mr = proj.get_model_registry() | |
# Delete Feature Views before deleting the feature groups | |
delete_feature_views(fs, "air_quality_fv") | |
# Delete ALL Feature Groups | |
delete_feature_groups(fs, "air_quality") | |
delete_feature_groups(fs, "weather") | |
delete_feature_groups(fs, "aq_predictions") | |
# Delete all Models | |
delete_models(mr, "air_quality_xgboost_model") | |
delete_secrets(proj, "SENSOR_LOCATION_JSON") | |
def secrets_api(proj): | |
host = "c.app.hopsworks.ai" | |
api_key = os.environ.get('HOPSWORKS_API_KEY') | |
conn = hopsworks.connection(host=host, project=proj, api_key_value=api_key) | |
return conn.get_secrets_api() | |
def check_file_path(file_path): | |
my_file = Path(file_path) | |
if my_file.is_file() == False: | |
print(f"Error. File not found at the path: {file_path} ") | |
else: | |
print(f"File successfully found at the path: {file_path}") | |
def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model): | |
features_df = weather_fg.read() | |
features_df = features_df.sort_values(by=['date'], ascending=True) | |
features_df = features_df.tail(10) | |
features_df['predicted_pm25'] = model.predict(features_df[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']]) | |
air_quality_df['date'] = pd.to_datetime(air_quality_df['date']) | |
features_df['date'] = features_df['date'].dt.tz_convert(None).astype('datetime64[ns]') | |
df = pd.merge(features_df, air_quality_df[['date','pm25','street','country']], on="date") | |
df['days_before_forecast_day'] = 1 | |
hindcast_df = df | |
df = df.drop('pm25', axis=1) | |
monitor_fg.insert(df, write_options={"wait_for_job": True}) | |
return hindcast_df | |