calculator / utils /indicators.py
mbuuck's picture
Added metric definition textbox
e33c38c
import datetime
import json
import os
from itertools import repeat
import ee
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import yaml
from utils import duckdb_queries as dq
from . import logging
GEE_SERVICE_ACCOUNT = (
"climatebase-july-2023@ee-geospatialml-aquarry.iam.gserviceaccount.com"
)
class IndexGenerator:
"""
A class to generate indices and compute zonal means.
Args:
indices (string[], required): Array of index names to include in aggregate index generation.
"""
def __init__(self):
# Authenticate to GEE & DuckDB
self._authenticate_ee(GEE_SERVICE_ACCOUNT)
self.roi = None
self.project_name = None
self.project_geometry = None
self.project_centroid = None
self.indices = None
self.metric_name = None
def set_metric(self, metric_name):
# Use defined subset of indices
indices_file = f'metrics/{metric_name.replace(" ", "_")}.yaml'
self.indices = self._load_indices(indices_file)
self.metric_name = metric_name
def set_project(self, project_name):
self.project_name = project_name
self.project_geometry = dq.get_project_geometry(self.project_name)
self.project_centroid = dq.get_project_centroid(self.project_name)
# to-do: refactor to involve fewer transformations
_polygon = json.dumps(
json.loads(self.project_geometry[0][0])["features"][0]["geometry"]
)
# to-do: don't use self.roi and instead pass patameter strategically
self.roi = ee.Geometry.Polygon(json.loads(_polygon)["coordinates"])
def _cloudfree(self, gee_path, daterange):
"""
Internal method to generate a cloud-free composite.
Args:
gee_path (str): The path to the Google Earth Engine (GEE) image or image collection.
Returns:
ee.Image: The cloud-free composite clipped to the region of interest.
"""
# Load a raw Landsat ImageCollection for a single year.
collection = (
ee.ImageCollection(gee_path).filterDate(*daterange).filterBounds(self.roi)
)
# Create a cloud-free composite with custom parameters for cloud score threshold and percentile.
composite_cloudfree = ee.Algorithms.Landsat.simpleComposite(
**{"collection": collection, "percentile": 75, "cloudScoreRange": 5}
)
return composite_cloudfree.clip(self.roi)
@staticmethod
def _load_indices(indices_file):
# Read index configurations
with open(indices_file, "r") as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as e:
logging.error(e)
return None
def generate_index(self, index_config, year):
"""
Generates an index based on the provided index configuration.
Args:
index_config (dict): Configuration for generating the index.
Returns:
ee.Image: The generated index clipped to the region of interest.
"""
# Calculate date range, assume 1 year
start_date = str(datetime.date(year, 1, 1))
end_date = str(datetime.date(year, 12, 31))
daterange = [start_date, end_date]
# Calculate index based on type
logging.info(
f"Generating index: {index_config['name']} of type {index_config['gee_type']}"
)
match index_config["gee_type"]:
case "image":
dataset = ee.Image(index_config["gee_path"]).clip(self.roi)
if index_config.get("select"):
dataset = dataset.select(index_config["select"])
case "image_collection":
dataset = (
ee.ImageCollection(index_config["gee_path"])
.filterBounds(self.roi)
.map(lambda image: image.clip(self.roi))
.mean()
)
if index_config.get("select"):
dataset = dataset.select(index_config["select"])
case "feature_collection":
dataset = (
ee.Image()
.float()
.paint(
ee.FeatureCollection(index_config["gee_path"]),
index_config["select"],
)
.clip(self.roi)
)
case "algebraic":
image = self._cloudfree(index_config["gee_path"], daterange)
# to-do: params should come from index_config
dataset = image.normalizedDifference(["B4", "B3"])
case _:
dataset = None
if not dataset:
raise Exception("Failed to generate dataset.")
# Normalize to a range of [0, 1]
min_val = 0
max_val = 1
if type(index_config['min'])==int or type(index_config['min']==float):
min_val = index_config['min']
if str(index_config['max'])=='roi_area':
max_val = self.roi.area().getInfo() # in m^2
elif type(index_config['max'])==int or type(index_config['max']==float):
max_val = index_config['max']
dataset.subtract(min_val)\
.divide(max_val - min_val)
logging.info(f"Generated index: {index_config['name']}")
return dataset
def zonal_mean_index(self, index_key, year):
index_config = self.indices[index_key]
dataset = self.generate_index(index_config, year)
logging.info(f"Calculating zonal mean for {index_key}...")
out = dataset.reduceRegion(
**{
"reducer": ee.Reducer.mean(),
"geometry": self.roi,
"scale": 2000, # map scale
"bestEffort": True,
"maxPixels": 1e3,
}
).getInfo()
if index_config.get("bandname"):
return out[index_config.get("bandname")]
logging.info(f"Calculated zonal mean for {index_key}.")
return out
def generate_composite_index_df(self, year):
data = {
"metric": self.metric_name,
"year": year,
"centroid": "",
"project_name": "",
"value": list(map(self.zonal_mean_index, self.indices, repeat(year))),
# to-do: calculate with duckdb; also, should be part of project table instead
"area": self.roi.area().getInfo(), # m^2
"geojson": "",
"coefficient": list(map(lambda x: self.indices[x]['coefficient'], self.indices))
}
logging.info("data", data)
df = pd.DataFrame(data)
return df
@staticmethod
def _authenticate_ee(ee_service_account):
"""
Huggingface Spaces does not support secret files, therefore authenticate with an environment variable containing the JSON.
"""
logging.info("Authenticating to Google Earth Engine...")
credentials = ee.ServiceAccountCredentials(
ee_service_account, key_data=os.environ["ee_service_account"]
)
ee.Initialize(credentials)
logging.info("Authenticated to Google Earth Engine.")
def _calculate_yearly_index(self, years):
dfs = []
logging.info(years)
# to-do: pararelize?
for year in years:
logging.info(year)
df = self.generate_composite_index_df(year)
dfs.append(df)
# Concatenate all dataframes
df_concat = pd.concat(dfs)
df_concat["centroid"] = str(self.project_centroid)
df_concat["project_name"] = self.project_name
df_concat["geojson"] = str(self.project_geometry)
return df_concat.round(2)
# h/t: https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/12\
@staticmethod
def _latlon_to_config(longitudes=None, latitudes=None):
"""Function documentation:\n
Basic framework adopted from Krichardson under the following thread:
https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/7
# NOTE:
# THIS IS A TEMPORARY SOLUTION UNTIL THE DASH TEAM IMPLEMENTS DYNAMIC ZOOM
# in their plotly-functions associated with mapbox, such as go.Densitymapbox() etc.
Returns the appropriate zoom-level for these plotly-mapbox-graphics along with
the center coordinate tuple of all provided coordinate tuples.
"""
# Check whether both latitudes and longitudes have been passed,
# or if the list lenghts don't match
if (latitudes is None or longitudes is None) or (
len(latitudes) != len(longitudes)
):
# Otherwise, return the default values of 0 zoom and the coordinate origin as center point
return 0, (0, 0)
# Get the boundary-box
b_box = {}
b_box["height"] = latitudes.max() - latitudes.min()
b_box["width"] = longitudes.max() - longitudes.min()
b_box["center"] = (np.mean(longitudes), np.mean(latitudes))
# get the area of the bounding box in order to calculate a zoom-level
area = b_box["height"] * b_box["width"]
# * 1D-linear interpolation with numpy:
# - Pass the area as the only x-value and not as a list, in order to return a scalar as well
# - The x-points "xp" should be in parts in comparable order of magnitude of the given area
# - The zpom-levels are adapted to the areas, i.e. start with the smallest area possible of 0
# which leads to the highest possible zoom value 20, and so forth decreasing with increasing areas
# as these variables are antiproportional
zoom = np.interp(
x=area,
xp=[0, 5**-10, 4**-10, 3**-10, 2**-10, 1**-10, 1**-5],
fp=[20, 15, 14, 13, 12, 7, 5],
)
# Finally, return the zoom level and the associated boundary-box center coordinates
return zoom, b_box["center"]
def show_project_map(self):
features = json.loads(self.project_geometry[0][0].replace("'", '"'))["features"]
geometry = features[0]["geometry"]
longitudes = np.array(geometry["coordinates"])[0, :, 0]
latitudes = np.array(geometry["coordinates"])[0, :, 1]
zoom, bbox_center = self._latlon_to_config(longitudes, latitudes)
fig = go.Figure(
go.Scattermapbox(
mode="markers",
lon=[bbox_center[0]],
lat=[bbox_center[1]],
marker={"size": 20, "color": ["cyan"]},
)
)
fig.update_layout(
mapbox={
"style": "satellite",
"accesstoken":os.environ['MAPBOX_ACCESS_TOKEN'],
"center": {"lon": bbox_center[0], "lat": bbox_center[1]},
"zoom": zoom,
"layers": [
{
"source": {
"type": "FeatureCollection",
"features": [{"type": "Feature", "geometry": geometry}],
},
"type": "fill",
"below": "traces",
"color": "royalblue",
"opacity": 0.5,
}
],
},
margin={"l": 0, "r": 0, "b": 0, "t": 0},
)
return fig
def calculate_score(self, start_year, end_year):
years = []
# Create `bioindicator` table IF NOT EXISTS.
dq.get_or_create_bioindicator_table()
for year in range(start_year, end_year+1):
row_exists = dq.check_if_project_exists_for_year(self.project_name, year)
if not row_exists:
years.append(year)
if len(years) > 0:
df = self._calculate_yearly_index(years)
# Write score table to `_temptable`
dq.write_score_to_temptable(df)
# UPSERT project record
dq.upsert_project_record()
logging.info("upserted records into motherduck")
scores = dq.get_project_scores(self.project_name, start_year, end_year)
scores.columns = scores.columns.str.replace('_', ' ').str.title()
if 'Area' in scores.columns:
scores['Area'] /= 1000**2
scores.rename(columns={'Area':'Area (km^2)'}, inplace=True)
if 'Score' in scores.columns:
scores['Score'] /= 1000**2
scores.rename(columns={'Score': 'Score (Area * Value)'}, inplace=True)
# Round scores to 4 significant figures
scores = scores.apply(
lambda x: ['%.4g'%x_i for x_i in x]
if pd.api.types.is_numeric_dtype(x)
else x)
return scores
def get_metric_file(self):
# Use defined subset of indices
indices_file = f'metrics/{self.metric_name.replace(" ", "_")}.yaml'
with open(indices_file, "r") as stream:
return stream.read()