Air_Quality_Index / functions.py
adjoint-bass's picture
update
af5a939
raw
history blame
5.7 kB
import requests
import os
import pandas as pd
import datetime
import numpy as np
from sklearn.preprocessing import OrdinalEncoder
from dotenv import load_dotenv
load_dotenv()
## TODO: write function to display the color coding of the categoies both in the df and as a guide.
#sg like:
def color_aq(val):
color = 'green' if val else 'red'
return f'background-color: {color}'
# but better
def get_air_quality_data(station_name):
AIR_QUALITY_API_KEY = os.getenv('AIR_QUALITY_API_KEY')
request_value = f'https://api.waqi.info/feed/{station_name}/?token={AIR_QUALITY_API_KEY}'
answer = requests.get(request_value).json()["data"]
forecast = answer['forecast']['daily']
return [
answer["time"]["s"][:10], # Date
int(forecast['pm25'][0]['avg']), # avg predicted pm25
int(forecast['pm10'][0]['avg']), # avg predicted pm10
max(int(forecast['pm25'][0]['avg']), int(forecast['pm10'][0]['avg'])) # avg predicted aqi
]
def get_air_quality_df(data):
col_names = [
'date',
'pm25',
'pm10',
'aqi'
]
new_data = pd.DataFrame(
data
).T
new_data.columns = col_names
new_data['pm25'] = pd.to_numeric(new_data['pm25'])
new_data['pm10'] = pd.to_numeric(new_data['pm10'])
new_data['aqi'] = pd.to_numeric(new_data['aqi'])
return new_data
def get_weather_data_daily(city):
WEATHER_API_KEY = os.getenv('WEATHER_API_KEY')
answer = requests.get(f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{city}/today?unitGroup=metric&include=days&key={WEATHER_API_KEY}&contentType=json').json()
data = answer['days'][0]
return [
answer['address'].lower(),
data['datetime'],
data['tempmax'],
data['tempmin'],
data['temp'],
data['feelslikemax'],
data['feelslikemin'],
data['feelslike'],
data['dew'],
data['humidity'],
data['precip'],
data['precipprob'],
data['precipcover'],
data['snow'],
data['snowdepth'],
data['windgust'],
data['windspeed'],
data['winddir'],
data['pressure'],
data['cloudcover'],
data['visibility'],
data['solarradiation'],
data['solarenergy'],
data['uvindex'],
data['conditions']
]
def get_weather_data_weekly(city: str, start_date: datetime) -> pd.DataFrame:
WEATHER_API_KEY = os.getenv('WEATHER_API_KEY')
end_date = f"{start_date + datetime.timedelta(days=6):%Y-%m-%d}"
answer = requests.get(f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{city}/{start_date}/{end_date}?unitGroup=metric&include=days&key={WEATHER_API_KEY}&contentType=json').json()
weather_data = answer['days']
final_df = pd.DataFrame()
for i in range(7):
data = weather_data[i]
list_of_data = [
answer['address'].lower(),
data['datetime'],
data['tempmax'],
data['tempmin'],
data['temp'],
data['feelslikemax'],
data['feelslikemin'],
data['feelslike'],
data['dew'],
data['humidity'],
data['precip'],
data['precipprob'],
data['precipcover'],
data['snow'],
data['snowdepth'],
data['windgust'],
data['windspeed'],
data['winddir'],
data['pressure'],
data['cloudcover'],
data['visibility'],
data['solarradiation'],
data['solarenergy'],
data['uvindex'],
data['conditions']
]
weather_df = get_weather_df(list_of_data)
final_df = pd.concat([final_df, weather_df])
return final_df
def get_weather_df(data):
col_names = [
'name',
'date',
'tempmax',
'tempmin',
'temp',
'feelslikemax',
'feelslikemin',
'feelslike',
'dew',
'humidity',
'precip',
'precipprob',
'precipcover',
'snow',
'snowdepth',
'windgust',
'windspeed',
'winddir',
'pressure',
'cloudcover',
'visibility',
'solarradiation',
'solarenergy',
'uvindex',
'conditions'
]
new_data = pd.DataFrame(
data
).T
new_data.columns = col_names
for col in col_names:
if col not in ['name', 'date', 'conditions']:
new_data[col] = pd.to_numeric(new_data[col])
return new_data
def data_encoder(X):
X.drop(columns=['date', 'name'], inplace=True)
X['conditions'] = OrdinalEncoder().fit_transform(X[['conditions']])
return X
def get_aplevel(temps:np.ndarray, table:list):
boundary_list = np.array([0, 50, 100, 150, 200, 300]) # assert temps.shape == [x, 1]
redf = np.logical_not(temps<=boundary_list) # temps.shape[0] x boundary_list.shape[0] ndarray
hift = np.concatenate((np.roll(redf, -1)[:, :-1], np.full((temps.shape[0], 1), False)), axis = 1)
cat = np.nonzero(np.not_equal(redf,hift))
level = [table[el] for el in cat[1]]
return level
def get_info(level:list):
air_pollution_level = {1:[50, 'Good', 'Green'], 2:[100, 'Moderate','Yellow'], 3:[150, 'Unhealthy for sensitive Groups','DarkOrange'],4:[200, 'Unhealthy','Red'] ,5:[300, 'Very Unhealthy','Purple'], 6:[1000, 'Hazardous','DarkRed']}
ind = [np.max(np.nonzero( el >= [air_pollution_level[key][0] for key in air_pollution_level.keys()])) for el in level]
color_text = [f"color:{air_pollution_level[idex][2]};" for idex in ind]
pollution_level = [air_pollution_level[idex][1] for idex in ind]
return color_text, pollution_level