Spaces:
Sleeping
Sleeping
File size: 12,519 Bytes
35ffba0 114a7d7 35ffba0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
import os
import datetime
import time
import requests
import pandas as pd
import json
from geopy.geocoders import Nominatim
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.ticker import MultipleLocator
import openmeteo_requests
import requests_cache
from retry_requests import retry
import hopsworks
import hsfs
from pathlib import Path
import sys
print(sys.path)
def get_historical_weather(city, start_date, end_date, latitude, longitude):
# latitude, longitude = get_city_coordinates(city)
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://archive-api.open-meteo.com/v1/archive"
params = {
"latitude": latitude,
"longitude": longitude,
"start_date": start_date,
"end_date": end_date,
"daily": ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"]
}
responses = openmeteo.weather_api(url, params=params)
# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")
# Process daily data. The order of variables needs to be the same as requested.
daily = response.Daily()
daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy()
daily_precipitation_sum = daily.Variables(1).ValuesAsNumpy()
daily_wind_speed_10m_max = daily.Variables(2).ValuesAsNumpy()
daily_wind_direction_10m_dominant = daily.Variables(3).ValuesAsNumpy()
daily_data = {"date": pd.date_range(
start = pd.to_datetime(daily.Time(), unit = "s"),
end = pd.to_datetime(daily.TimeEnd(), unit = "s"),
freq = pd.Timedelta(seconds = daily.Interval()),
inclusive = "left"
)}
daily_data["temperature_2m_mean"] = daily_temperature_2m_mean
daily_data["precipitation_sum"] = daily_precipitation_sum
daily_data["wind_speed_10m_max"] = daily_wind_speed_10m_max
daily_data["wind_direction_10m_dominant"] = daily_wind_direction_10m_dominant
daily_dataframe = pd.DataFrame(data = daily_data)
daily_dataframe = daily_dataframe.dropna()
daily_dataframe['city'] = city
return daily_dataframe
def get_hourly_weather_forecast(city, latitude, longitude):
# latitude, longitude = get_city_coordinates(city)
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://api.open-meteo.com/v1/ecmwf"
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": ["temperature_2m", "precipitation", "wind_speed_10m", "wind_direction_10m"]
}
responses = openmeteo.weather_api(url, params=params)
# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")
# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_precipitation = hourly.Variables(1).ValuesAsNumpy()
hourly_wind_speed_10m = hourly.Variables(2).ValuesAsNumpy()
hourly_wind_direction_10m = hourly.Variables(3).ValuesAsNumpy()
hourly_data = {"date": pd.date_range(
start = pd.to_datetime(hourly.Time(), unit = "s"),
end = pd.to_datetime(hourly.TimeEnd(), unit = "s"),
freq = pd.Timedelta(seconds = hourly.Interval()),
inclusive = "left"
)}
hourly_data["temperature_2m_mean"] = hourly_temperature_2m
hourly_data["precipitation_sum"] = hourly_precipitation
hourly_data["wind_speed_10m_max"] = hourly_wind_speed_10m
hourly_data["wind_direction_10m_dominant"] = hourly_wind_direction_10m
hourly_dataframe = pd.DataFrame(data = hourly_data)
hourly_dataframe = hourly_dataframe.dropna()
return hourly_dataframe
def get_city_coordinates(city_name: str):
"""
Takes city name and returns its latitude and longitude (rounded to 2 digits after dot).
"""
# Initialize Nominatim API (for getting lat and long of the city)
geolocator = Nominatim(user_agent="MyApp")
city = geolocator.geocode(city_name)
latitude = round(city.latitude, 2)
longitude = round(city.longitude, 2)
return latitude, longitude
def trigger_request(url:str):
response = requests.get(url)
if response.status_code == 200:
# Extract the JSON content from the response
data = response.json()
else:
print("Failed to retrieve data. Status Code:", response.status_code)
raise requests.exceptions.RequestException(response.status_code)
return data
def get_pm25(aqicn_url: str, country: str, city: str, street: str, day: datetime.date, AQI_API_KEY: str):
"""
Returns DataFrame with air quality (pm25) as dataframe
"""
# The API endpoint URL
url = f"{aqicn_url}/?token={AQI_API_KEY}"
# Make a GET request to fetch the data from the API
data = trigger_request(url)
# if we get 'Unknown station' response then retry with city in url
if data['data'] == "Unknown station":
url1 = f"https://api.waqi.info/feed/{country}/{street}/?token={AQI_API_KEY}"
data = trigger_request(url1)
if data['data'] == "Unknown station":
url2 = f"https://api.waqi.info/feed/{country}/{city}/{street}/?token={AQI_API_KEY}"
data = trigger_request(url2)
# Check if the API response contains the data
if data['status'] == 'ok':
# Extract the air quality data
aqi_data = data['data']
aq_today_df = pd.DataFrame()
aq_today_df['pm25'] = [aqi_data['iaqi'].get('pm25', {}).get('v', None)]
aq_today_df['pm25'] = aq_today_df['pm25'].astype('float32')
aq_today_df['country'] = country
aq_today_df['city'] = city
aq_today_df['street'] = street
aq_today_df['date'] = day
aq_today_df['date'] = pd.to_datetime(aq_today_df['date'])
aq_today_df['url'] = aqicn_url
else:
print("Error: There may be an incorrect URL for your Sensor or it is not contactable right now. The API response does not contain data. Error message:", data['data'])
raise requests.exceptions.RequestException(data['data'])
return aq_today_df
def plot_air_quality_forecast(city: str, street: str, df: pd.DataFrame, file_path: str, hindcast=False):
fig, ax = plt.subplots(figsize=(10, 6))
day = pd.to_datetime(df['date']).dt.date
# Plot each column separately in matplotlib
ax.plot(day, df['predicted_pm25'], label='Predicted PM2.5', color='red', linewidth=2, marker='o', markersize=5, markerfacecolor='blue')
# Set the y-axis to a logarithmic scale
ax.set_yscale('log')
ax.set_yticks([0, 10, 25, 50, 100, 250, 500])
ax.get_yaxis().set_major_formatter(plt.ScalarFormatter())
ax.set_ylim(bottom=1)
# Set the labels and title
ax.set_xlabel('Date')
ax.set_title(f"PM2.5 Predicted (Logarithmic Scale) for {city}, {street}")
ax.set_ylabel('PM2.5')
colors = ['green', 'yellow', 'orange', 'red', 'purple', 'darkred']
labels = ['Good', 'Moderate', 'Unhealthy for Some', 'Unhealthy', 'Very Unhealthy', 'Hazardous']
ranges = [(0, 49), (50, 99), (100, 149), (150, 199), (200, 299), (300, 500)]
for color, (start, end) in zip(colors, ranges):
ax.axhspan(start, end, color=color, alpha=0.3)
# Add a legend for the different Air Quality Categories
patches = [Patch(color=colors[i], label=f"{labels[i]}: {ranges[i][0]}-{ranges[i][1]}") for i in range(len(colors))]
legend1 = ax.legend(handles=patches, loc='upper right', title="Air Quality Categories", fontsize='x-small')
# Aim for ~10 annotated values on x-axis, will work for both forecasts ans hindcasts
if len(df.index) > 11:
every_x_tick = len(df.index) / 10
ax.xaxis.set_major_locator(MultipleLocator(every_x_tick))
plt.xticks(rotation=45)
if hindcast == True:
ax.plot(day, df['pm25'], label='Actual PM2.5', color='black', linewidth=2, marker='^', markersize=5, markerfacecolor='grey')
legend2 = ax.legend(loc='upper left', fontsize='x-small')
ax.add_artist(legend1)
# Ensure everything is laid out neatly
plt.tight_layout()
# # Save the figure, overwriting any existing file with the same name
plt.savefig(file_path)
return plt
def delete_feature_groups(fs, name):
try:
for fg in fs.get_feature_groups(name):
fg.delete()
print(f"Deleted {fg.name}/{fg.version}")
except hsfs.client.exceptions.RestAPIError:
print(f"No {name} feature group found")
def delete_feature_views(fs, name):
try:
for fv in fs.get_feature_views(name):
fv.delete()
print(f"Deleted {fv.name}/{fv.version}")
except hsfs.client.exceptions.RestAPIError:
print(f"No {name} feature view found")
def delete_models(mr, name):
models = mr.get_models(name)
if not models:
print(f"No {name} model found")
for model in models:
model.delete()
print(f"Deleted model {model.name}/{model.version}")
def delete_secrets(proj, name):
secrets = secrets_api(proj.name)
try:
secret = secrets.get_secret(name)
secret.delete()
print(f"Deleted secret {name}")
except hopsworks.client.exceptions.RestAPIError:
print(f"No {name} secret found")
# WARNING - this will wipe out all your feature data and models
def purge_project(proj):
fs = proj.get_feature_store()
mr = proj.get_model_registry()
# Delete Feature Views before deleting the feature groups
delete_feature_views(fs, "air_quality_fv")
# Delete ALL Feature Groups
delete_feature_groups(fs, "air_quality")
delete_feature_groups(fs, "weather")
delete_feature_groups(fs, "aq_predictions")
# Delete all Models
delete_models(mr, "air_quality_xgboost_model")
delete_secrets(proj, "SENSOR_LOCATION_JSON")
def secrets_api(proj):
host = "c.app.hopsworks.ai"
api_key = os.environ.get('HOPSWORKS_API_KEY')
conn = hopsworks.connection(host=host, project=proj, api_key_value=api_key)
return conn.get_secrets_api()
def check_file_path(file_path):
my_file = Path(file_path)
if my_file.is_file() == False:
print(f"Error. File not found at the path: {file_path} ")
else:
print(f"File successfully found at the path: {file_path}")
def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model):
features_df = weather_fg.read()
features_df = features_df.sort_values(by=['date'], ascending=True)
features_df = features_df.tail(10)
features_df['predicted_pm25'] = model.predict(features_df[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']])
air_quality_df['date'] = pd.to_datetime(air_quality_df['date'])
features_df['date'] = features_df['date'].dt.tz_convert(None).astype('datetime64[ns]')
df = pd.merge(features_df, air_quality_df[['date','pm25','street','country']], on="date")
df['days_before_forecast_day'] = 1
hindcast_df = df
df = df.drop('pm25', axis=1)
monitor_fg.insert(df, write_options={"wait_for_job": True})
return hindcast_df
|