Spaces:
Build error
Build error
import datetime | |
import json | |
import os | |
from itertools import repeat | |
import ee | |
import numpy as np | |
import pandas as pd | |
import plotly.graph_objects as go | |
import yaml | |
from utils import duckdb_queries as dq | |
from . import logging | |
GEE_SERVICE_ACCOUNT = ( | |
"climatebase-july-2023@ee-geospatialml-aquarry.iam.gserviceaccount.com" | |
) | |
class IndexGenerator: | |
""" | |
A class to generate indices and compute zonal means. | |
Args: | |
indices (string[], required): Array of index names to include in aggregate index generation. | |
""" | |
def __init__(self): | |
# Authenticate to GEE & DuckDB | |
self._authenticate_ee(GEE_SERVICE_ACCOUNT) | |
self.roi = None | |
self.project_name = None | |
self.project_geometry = None | |
self.project_centroid = None | |
self.indices = None | |
self.metric_name = None | |
def set_metric(self, metric_name): | |
# Use defined subset of indices | |
indices_file = f'metrics/{metric_name.replace(" ", "_")}.yaml' | |
self.indices = self._load_indices(indices_file) | |
self.metric_name = metric_name | |
def set_project(self, project_name): | |
self.project_name = project_name | |
self.project_geometry = dq.get_project_geometry(self.project_name) | |
self.project_centroid = dq.get_project_centroid(self.project_name) | |
# to-do: refactor to involve fewer transformations | |
_polygon = json.dumps( | |
json.loads(self.project_geometry[0][0])["features"][0]["geometry"] | |
) | |
# to-do: don't use self.roi and instead pass patameter strategically | |
self.roi = ee.Geometry.Polygon(json.loads(_polygon)["coordinates"]) | |
def _cloudfree(self, gee_path, daterange): | |
""" | |
Internal method to generate a cloud-free composite. | |
Args: | |
gee_path (str): The path to the Google Earth Engine (GEE) image or image collection. | |
Returns: | |
ee.Image: The cloud-free composite clipped to the region of interest. | |
""" | |
# Load a raw Landsat ImageCollection for a single year. | |
collection = ( | |
ee.ImageCollection(gee_path).filterDate(*daterange).filterBounds(self.roi) | |
) | |
# Create a cloud-free composite with custom parameters for cloud score threshold and percentile. | |
composite_cloudfree = ee.Algorithms.Landsat.simpleComposite( | |
**{"collection": collection, "percentile": 75, "cloudScoreRange": 5} | |
) | |
return composite_cloudfree.clip(self.roi) | |
def _load_indices(indices_file): | |
# Read index configurations | |
with open(indices_file, "r") as stream: | |
try: | |
return yaml.safe_load(stream) | |
except yaml.YAMLError as e: | |
logging.error(e) | |
return None | |
def generate_index(self, index_config, year): | |
""" | |
Generates an index based on the provided index configuration. | |
Args: | |
index_config (dict): Configuration for generating the index. | |
Returns: | |
ee.Image: The generated index clipped to the region of interest. | |
""" | |
# Calculate date range, assume 1 year | |
start_date = str(datetime.date(year, 1, 1)) | |
end_date = str(datetime.date(year, 12, 31)) | |
daterange = [start_date, end_date] | |
# Calculate index based on type | |
logging.info( | |
f"Generating index: {index_config['name']} of type {index_config['gee_type']}" | |
) | |
match index_config["gee_type"]: | |
case "image": | |
dataset = ee.Image(index_config["gee_path"]).clip(self.roi) | |
if index_config.get("select"): | |
dataset = dataset.select(index_config["select"]) | |
case "image_collection": | |
dataset = ( | |
ee.ImageCollection(index_config["gee_path"]) | |
.filterBounds(self.roi) | |
.map(lambda image: image.clip(self.roi)) | |
.mean() | |
) | |
if index_config.get("select"): | |
dataset = dataset.select(index_config["select"]) | |
case "feature_collection": | |
dataset = ( | |
ee.Image() | |
.float() | |
.paint( | |
ee.FeatureCollection(index_config["gee_path"]), | |
index_config["select"], | |
) | |
.clip(self.roi) | |
) | |
case "algebraic": | |
image = self._cloudfree(index_config["gee_path"], daterange) | |
# to-do: params should come from index_config | |
dataset = image.normalizedDifference(["B4", "B3"]) | |
case _: | |
dataset = None | |
if not dataset: | |
raise Exception("Failed to generate dataset.") | |
# Normalize to a range of [0, 1] | |
min_val = 0 | |
max_val = 1 | |
if type(index_config['min'])==int or type(index_config['min']==float): | |
min_val = index_config['min'] | |
if str(index_config['max'])=='roi_area': | |
max_val = self.roi.area().getInfo() # in m^2 | |
elif type(index_config['max'])==int or type(index_config['max']==float): | |
max_val = index_config['max'] | |
dataset.subtract(min_val)\ | |
.divide(max_val - min_val) | |
logging.info(f"Generated index: {index_config['name']}") | |
return dataset | |
def zonal_mean_index(self, index_key, year): | |
index_config = self.indices[index_key] | |
dataset = self.generate_index(index_config, year) | |
logging.info(f"Calculating zonal mean for {index_key}...") | |
out = dataset.reduceRegion( | |
**{ | |
"reducer": ee.Reducer.mean(), | |
"geometry": self.roi, | |
"scale": 2000, # map scale | |
"bestEffort": True, | |
"maxPixels": 1e3, | |
} | |
).getInfo() | |
if index_config.get("bandname"): | |
return out[index_config.get("bandname")] | |
logging.info(f"Calculated zonal mean for {index_key}.") | |
return out | |
def generate_composite_index_df(self, year): | |
data = { | |
"metric": self.metric_name, | |
"year": year, | |
"centroid": "", | |
"project_name": "", | |
"value": list(map(self.zonal_mean_index, self.indices, repeat(year))), | |
# to-do: calculate with duckdb; also, should be part of project table instead | |
"area": self.roi.area().getInfo(), # m^2 | |
"geojson": "", | |
"coefficient": list(map(lambda x: self.indices[x]['coefficient'], self.indices)) | |
} | |
logging.info("data", data) | |
df = pd.DataFrame(data) | |
return df | |
def _authenticate_ee(ee_service_account): | |
""" | |
Huggingface Spaces does not support secret files, therefore authenticate with an environment variable containing the JSON. | |
""" | |
logging.info("Authenticating to Google Earth Engine...") | |
credentials = ee.ServiceAccountCredentials( | |
ee_service_account, key_data=os.environ["ee_service_account"] | |
) | |
ee.Initialize(credentials) | |
logging.info("Authenticated to Google Earth Engine.") | |
def _calculate_yearly_index(self, years): | |
dfs = [] | |
logging.info(years) | |
# to-do: pararelize? | |
for year in years: | |
logging.info(year) | |
df = self.generate_composite_index_df(year) | |
dfs.append(df) | |
# Concatenate all dataframes | |
df_concat = pd.concat(dfs) | |
df_concat["centroid"] = str(self.project_centroid) | |
df_concat["project_name"] = self.project_name | |
df_concat["geojson"] = str(self.project_geometry) | |
return df_concat.round(2) | |
# h/t: https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/12\ | |
def _latlon_to_config(longitudes=None, latitudes=None): | |
"""Function documentation:\n | |
Basic framework adopted from Krichardson under the following thread: | |
https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/7 | |
# NOTE: | |
# THIS IS A TEMPORARY SOLUTION UNTIL THE DASH TEAM IMPLEMENTS DYNAMIC ZOOM | |
# in their plotly-functions associated with mapbox, such as go.Densitymapbox() etc. | |
Returns the appropriate zoom-level for these plotly-mapbox-graphics along with | |
the center coordinate tuple of all provided coordinate tuples. | |
""" | |
# Check whether both latitudes and longitudes have been passed, | |
# or if the list lenghts don't match | |
if (latitudes is None or longitudes is None) or ( | |
len(latitudes) != len(longitudes) | |
): | |
# Otherwise, return the default values of 0 zoom and the coordinate origin as center point | |
return 0, (0, 0) | |
# Get the boundary-box | |
b_box = {} | |
b_box["height"] = latitudes.max() - latitudes.min() | |
b_box["width"] = longitudes.max() - longitudes.min() | |
b_box["center"] = (np.mean(longitudes), np.mean(latitudes)) | |
# get the area of the bounding box in order to calculate a zoom-level | |
area = b_box["height"] * b_box["width"] | |
# * 1D-linear interpolation with numpy: | |
# - Pass the area as the only x-value and not as a list, in order to return a scalar as well | |
# - The x-points "xp" should be in parts in comparable order of magnitude of the given area | |
# - The zpom-levels are adapted to the areas, i.e. start with the smallest area possible of 0 | |
# which leads to the highest possible zoom value 20, and so forth decreasing with increasing areas | |
# as these variables are antiproportional | |
zoom = np.interp( | |
x=area, | |
xp=[0, 5**-10, 4**-10, 3**-10, 2**-10, 1**-10, 1**-5], | |
fp=[20, 15, 14, 13, 12, 7, 5], | |
) | |
# Finally, return the zoom level and the associated boundary-box center coordinates | |
return zoom, b_box["center"] | |
def show_project_map(self): | |
features = json.loads(self.project_geometry[0][0].replace("'", '"'))["features"] | |
geometry = features[0]["geometry"] | |
longitudes = np.array(geometry["coordinates"])[0, :, 0] | |
latitudes = np.array(geometry["coordinates"])[0, :, 1] | |
zoom, bbox_center = self._latlon_to_config(longitudes, latitudes) | |
fig = go.Figure( | |
go.Scattermapbox( | |
mode="markers", | |
lon=[bbox_center[0]], | |
lat=[bbox_center[1]], | |
marker={"size": 20, "color": ["cyan"]}, | |
) | |
) | |
fig.update_layout( | |
mapbox={ | |
"style": "satellite", | |
"accesstoken":os.environ['MAPBOX_ACCESS_TOKEN'], | |
"center": {"lon": bbox_center[0], "lat": bbox_center[1]}, | |
"zoom": zoom, | |
"layers": [ | |
{ | |
"source": { | |
"type": "FeatureCollection", | |
"features": [{"type": "Feature", "geometry": geometry}], | |
}, | |
"type": "fill", | |
"below": "traces", | |
"color": "royalblue", | |
"opacity": 0.5, | |
} | |
], | |
}, | |
margin={"l": 0, "r": 0, "b": 0, "t": 0}, | |
) | |
return fig | |
def calculate_score(self, start_year, end_year): | |
years = [] | |
# Create `bioindicator` table IF NOT EXISTS. | |
dq.get_or_create_bioindicator_table() | |
for year in range(start_year, end_year+1): | |
row_exists = dq.check_if_project_exists_for_year(self.project_name, year) | |
if not row_exists: | |
years.append(year) | |
if len(years) > 0: | |
df = self._calculate_yearly_index(years) | |
# Write score table to `_temptable` | |
dq.write_score_to_temptable(df) | |
# UPSERT project record | |
dq.upsert_project_record() | |
logging.info("upserted records into motherduck") | |
scores = dq.get_project_scores(self.project_name, start_year, end_year) | |
scores.columns = scores.columns.str.replace('_', ' ').str.title() | |
if 'Area' in scores.columns: | |
scores['Area'] /= 1000**2 | |
scores.rename(columns={'Area':'Area (km^2)'}, inplace=True) | |
if 'Score' in scores.columns: | |
scores['Score'] /= 1000**2 | |
scores.rename(columns={'Score': 'Score (Area * Value)'}, inplace=True) | |
# Round scores to 4 significant figures | |
scores = scores.apply( | |
lambda x: ['%.4g'%x_i for x_i in x] | |
if pd.api.types.is_numeric_dtype(x) | |
else x) | |
return scores | |
def get_metric_file(self): | |
# Use defined subset of indices | |
indices_file = f'metrics/{self.metric_name.replace(" ", "_")}.yaml' | |
with open(indices_file, "r") as stream: | |
return stream.read() |