|
"""Helpers dedicated to georeferencing duties""" |
|
import base64 |
|
import glob |
|
import json |
|
import os |
|
import zlib |
|
from math import log, tan, radians, cos, pi, floor, degrees, atan, sinh |
|
|
|
import rasterio |
|
|
|
from src import app_logger |
|
from src.utilities.constants import GEOJSON_SQUARE_TEMPLATE, OUTPUT_CRS_STRING, INPUT_CRS_STRING, SKIP_CONDITIONS_LIST |
|
from src.utilities.type_hints import ts_llist_float2, ts_geojson, ts_dict_str2b, ts_tuple_flat2, ts_tuple_flat4, \ |
|
ts_list_float4, ts_llist2, ts_tuple_int4, ts_ddict2 |
|
|
|
ZIPJSON_KEY = 'base64(zip(o))' |
|
|
|
|
|
def get_geojson_square_angles(bounding_box:ts_llist_float2, name:str="buffer", debug:bool=False) -> ts_geojson: |
|
""" |
|
Create a geojson-like dict rectangle from the input latitude/longitude bounding box |
|
|
|
Args: |
|
bounding_box: float latitude/longitude bounding box |
|
name: geojson-like rectangle name |
|
debug: bool, default=False |
|
logging debug argument |
|
|
|
Returns: |
|
dict: geojson-like object rectangle |
|
|
|
""" |
|
import copy |
|
|
|
|
|
|
|
app_logger.info(f"bounding_box:{bounding_box}.") |
|
top = bounding_box[0][0] |
|
right = bounding_box[0][1] |
|
bottom = bounding_box[1][0] |
|
left = bounding_box[1][1] |
|
bottom_left = [left, bottom] |
|
top_left = [left, top] |
|
top_right = [right, top] |
|
bottom_right = [right, bottom] |
|
coords = [bottom_left, top_left, top_right, bottom_right] |
|
app_logger.info(f"coords:{coords}.") |
|
geojson = copy.copy(GEOJSON_SQUARE_TEMPLATE) |
|
geojson["name"] = name |
|
geojson["features"][0]["geometry"]["coordinates"] = [[coords]] |
|
app_logger.info(f"geojson:{geojson}.") |
|
return geojson |
|
|
|
|
|
def crop_raster(merged_raster_path:str, area_crop_geojson:dict, debug:bool=False) -> ts_dict_str2b: |
|
""" |
|
Crop a raster using a geojson-like object rectangle |
|
|
|
Args: |
|
merged_raster_path: filename path pointing string to the raster to crop |
|
area_crop_geojson: geojson-like object rectangle |
|
debug: bool, default=False |
|
logging debug argument |
|
|
|
Returns: |
|
dict: the cropped raster numpy array and the transform object with the georeferencing reference |
|
|
|
""" |
|
|
|
|
|
|
|
try: |
|
import rasterio |
|
from rasterio.mask import mask |
|
|
|
app_logger.info(f"area_crop_geojson::{area_crop_geojson}.") |
|
geojson_reprojected = get_geojson_reprojected(area_crop_geojson, debug=debug) |
|
shapes = [feature["geometry"] for feature in geojson_reprojected["features"]] |
|
app_logger.info(f"geojson_reprojected:{geojson_reprojected}.") |
|
|
|
app_logger.info(f"reading merged_raster_path while masking it from path:{merged_raster_path}.") |
|
with rasterio.open(merged_raster_path, "r") as src: |
|
masked_raster, masked_transform = mask(src, shapes, crop=True) |
|
masked_meta = src.meta |
|
app_logger.info(f"merged_raster_path, src:{src}.") |
|
masked_meta.update({ |
|
"driver": "GTiff", "height": masked_raster.shape[1], |
|
"width": masked_raster.shape[2], "transform": masked_transform} |
|
) |
|
return {"masked_raster": masked_raster, "masked_meta": masked_meta, "masked_transform": masked_transform} |
|
except Exception as e: |
|
app_logger.error(e) |
|
raise e |
|
|
|
|
|
def get_geojson_reprojected(geojson:dict, output_crs:str=OUTPUT_CRS_STRING, debug:bool=False) -> dict: |
|
""" |
|
change projection for input geojson-like object polygon |
|
|
|
Args: |
|
geojson: input geojson-like object polygon |
|
output_crs: output crs string - Coordinate Reference Systems |
|
debug: logging debug argument |
|
|
|
Returns: |
|
dict: reprojected geojson-like object |
|
|
|
""" |
|
|
|
|
|
|
|
if not isinstance(geojson, dict): |
|
raise ValueError(f"geojson here should be a dict, not of type {type(geojson)}.") |
|
app_logger.info(f"start reprojecting geojson:{geojson}.") |
|
try: |
|
features = geojson['features'] |
|
|
|
output_crs_json = {"type": "name", "properties": {"name": f"urn:ogc:def:crs:{output_crs}"}} |
|
geojson_output = {'features': [], 'type': 'FeatureCollection', "name": "converted", "crs": output_crs_json} |
|
|
|
|
|
for feature in features: |
|
feature_out = feature.copy() |
|
new_coords = [] |
|
feat = feature['geometry'] |
|
app_logger.info(f"feat:{feat}.") |
|
coords = feat['coordinates'] |
|
app_logger.info(f"coordinates:{coords}.") |
|
|
|
for coord_a in coords: |
|
new_coords_a = [] |
|
for cord_b in coord_a: |
|
new_coords_b = [] |
|
|
|
|
|
for xconv, yconf in cord_b: |
|
app_logger.info(f"xconv, yconf:{xconv},{yconf}.") |
|
x2, y2 = latlon_to_mercator(xconv, yconf) |
|
app_logger.info(f"x2, y2:{x2},{y2}.") |
|
new_coords_b.append([x2, y2]) |
|
new_coords_a.append(new_coords_b) |
|
new_coords.append(new_coords_a) |
|
feature_out['geometry']['coordinates'] = new_coords |
|
geojson_output['features'].append(feature_out) |
|
app_logger.info(f"geojson_output:{geojson_output}.") |
|
return geojson_output |
|
except KeyError as ke_get_geojson_reprojected: |
|
msg = f"ke_get_geojson_reprojected:{ke_get_geojson_reprojected}." |
|
app_logger.error(msg) |
|
raise KeyError(msg) |
|
|
|
|
|
def latlon_to_mercator( |
|
lat:float, lon:float, input_crs:str=INPUT_CRS_STRING, output_crs:str=OUTPUT_CRS_STRING, always_xy:bool=True, debug:bool=False |
|
) -> ts_tuple_flat2: |
|
""" |
|
Return a tuple of latitude, longitude float coordinates values transformed to mercator |
|
|
|
Args: |
|
|
|
lat: input latitude float value |
|
lon: input longitude float value |
|
input_crs: string, input Coordinate Reference Systems |
|
output_crs: string, output Coordinate Reference Systems |
|
always_xy: bool, default=True. |
|
If true, the transform method will accept as input and return as output |
|
coordinates using the traditional GIS order, that is longitude, latitude |
|
for geographic CRS and easting, northing for most projected CRS. |
|
debug: bool, default=False. |
|
logging debug argument |
|
|
|
Returns: |
|
tuple latitude/longitude float values |
|
|
|
""" |
|
|
|
|
|
try: |
|
from pyproj import Transformer |
|
app_logger.info(f"lat:{lat},lon:{lon}.") |
|
transformer = Transformer.from_crs(input_crs, output_crs, always_xy=always_xy) |
|
out_lat, out_lon = transformer.transform(lat, lon) |
|
app_logger.info(f"out_lat:{out_lat},out_lon:{out_lon}.") |
|
return out_lat, out_lon |
|
except Exception as e_latlon_to_mercator: |
|
app_logger.error(f"e_latlon_to_mercator:{e_latlon_to_mercator}.") |
|
raise e_latlon_to_mercator |
|
|
|
|
|
def sec(x:float) -> float: |
|
""" |
|
Return secant (the reciprocal of the cosine) for given value |
|
|
|
Args: |
|
x: input float value |
|
|
|
Returns: |
|
float: secant of given float value |
|
|
|
""" |
|
return 1 / cos(x) |
|
|
|
|
|
def latlon_to_xyz(lat:float, lon:float, z:int) -> ts_tuple_flat2: |
|
""" |
|
Return x/y coordinates points for tiles from latitude/longitude values point. |
|
|
|
Args: |
|
lon: float longitude value |
|
lat: float latitude value |
|
z: float zoom value |
|
|
|
Returns: |
|
tuple: x, y values tiles coordinates |
|
|
|
""" |
|
tile_count = pow(2, z) |
|
x = (lon + 180) / 360 |
|
y = (1 - log(tan(radians(lat)) + sec(radians(lat))) / pi) / 2 |
|
return tile_count * x, tile_count * y |
|
|
|
|
|
def bbox_to_xyz(lon_min:float, lon_max:float, lat_min:float, lat_max:float, z:int) -> ts_tuple_flat4: |
|
""" |
|
Return xyz reference coordinates for tiles from latitude/longitude min and max values. |
|
|
|
Args: |
|
lon_min: float min longitude value |
|
lon_max: float max longitude value |
|
lat_min: float min latitude value |
|
lat_max: float max latitude value |
|
z: float zoom value |
|
|
|
Returns: |
|
tuple: float x min, x max, y min, y max values tiles coordinates |
|
|
|
""" |
|
x_min, y_max = latlon_to_xyz(lat_min, lon_min, z) |
|
x_max, y_min = latlon_to_xyz(lat_max, lon_max, z) |
|
return (floor(x_min), floor(x_max), |
|
floor(y_min), floor(y_max)) |
|
|
|
|
|
def mercator_to_lat(mercator_y:float) -> float: |
|
""" |
|
Return latitude value coordinate from mercator coordinate value |
|
|
|
Args: |
|
mercator_y: float mercator value coordinate |
|
|
|
Returns: |
|
float: latitude value coordinate |
|
|
|
""" |
|
return degrees(atan(sinh(mercator_y))) |
|
|
|
|
|
def y_to_lat_edges(y:float, z:int) -> ts_tuple_flat2: |
|
""" |
|
Return edge float latitude values coordinates from x,z tiles coordinates |
|
|
|
Args: |
|
y: float x tile value coordinate |
|
z: float zoom tile value coordinate |
|
|
|
Returns: |
|
tuple: two float latitude values coordinates |
|
|
|
""" |
|
tile_count = pow(2, z) |
|
unit = 1 / tile_count |
|
relative_y1 = y * unit |
|
relative_y2 = relative_y1 + unit |
|
lat1 = mercator_to_lat(pi * (1 - 2 * relative_y1)) |
|
lat2 = mercator_to_lat(pi * (1 - 2 * relative_y2)) |
|
return lat1, lat2 |
|
|
|
|
|
def x_to_lon_edges(x:float, z:int) -> ts_tuple_flat2: |
|
""" |
|
Return edge float longitude values coordinates from x,z tiles coordinates |
|
|
|
Args: |
|
x: float x tile value coordinate |
|
z: float zoom tile value coordinate |
|
|
|
Returns: |
|
tuple: two float longitude values coordinates |
|
|
|
""" |
|
tile_count = pow(2, z) |
|
unit = 360 / tile_count |
|
lon1 = -180 + x * unit |
|
lon2 = lon1 + unit |
|
return lon1, lon2 |
|
|
|
|
|
def tile_edges(x:float, y:float, z:int) -> ts_list_float4: |
|
""" |
|
Return edge float latitude/longitude value coordinates from xyz tiles coordinates |
|
|
|
Args: |
|
x: float x tile value coordinate |
|
y: float y tile value coordinate |
|
z: float zoom tile value coordinate |
|
|
|
Returns: |
|
tuple: float latitude/longitude values coordinates |
|
|
|
""" |
|
lat1, lat2 = y_to_lat_edges(y, z) |
|
lon1, lon2 = x_to_lon_edges(x, z) |
|
return [lon1, lat1, lon2, lat2] |
|
|
|
|
|
def merge_tiles(input_pattern:str, output_path:str, temp_dir:str, debug:bool=False) -> None: |
|
""" |
|
Merge given raster glob input pattern into one unique georeferenced raster. |
|
|
|
Args: |
|
input_pattern: input glob pattern needed for search the raster filenames |
|
output_path: output path where to write the merged raster |
|
temp_dir: temporary folder needed for create |
|
debug: bool, default=False. |
|
logging debug argument |
|
|
|
""" |
|
|
|
|
|
|
|
try: |
|
from osgeo import gdal |
|
except ModuleNotFoundError as module_error_merge_tiles: |
|
msg = f"module_error_merge_tiles:{module_error_merge_tiles}." |
|
app_logger.error(msg) |
|
raise module_error_merge_tiles |
|
|
|
try: |
|
vrt_path = os.path.join(temp_dir, "tiles.vrt") |
|
os_list_dir1 = os.listdir(temp_dir) |
|
app_logger.info(f"os_list_dir1:{os_list_dir1}.") |
|
|
|
gdal.BuildVRT(vrt_path, glob.glob(input_pattern)) |
|
gdal.Translate(output_path, vrt_path) |
|
|
|
os_list_dir2 = os.listdir(temp_dir) |
|
app_logger.info(f"os_list_dir2:{os_list_dir2}.") |
|
except IOError as ioe_merge_tiles: |
|
msg = f"ioe_merge_tiles:{ioe_merge_tiles}." |
|
app_logger.error(msg) |
|
raise ioe_merge_tiles |
|
|
|
|
|
def get_lat_lon_coords(bounding_box: ts_llist2) -> ts_tuple_int4: |
|
""" |
|
Return couples of float latitude/longitude values from bounding box input list. |
|
|
|
Args: |
|
bounding_box: bounding box input list of latitude/longitude coordinates |
|
|
|
Returns: |
|
tuple: float longitude min, latitude min, longitude max, longitude max values coordinates |
|
|
|
""" |
|
top_right, bottom_left = bounding_box |
|
lat_max, lon_max = top_right |
|
lat_min, lon_min = bottom_left |
|
if lon_min == lon_max or lat_min == lat_max: |
|
raise ValueError(f"latitude and/or longitude coordinates should not be equal each others... {bounding_box}.") |
|
return lon_min, lat_min, lon_max, lat_max |
|
|
|
|
|
def get_prediction_georeferenced(prediction_obj:dict, transform:rasterio.transform, skip_conditions_list:list=None, debug:bool=False) -> dict: |
|
""" |
|
Return a georeferenced geojson-like object starting from a dict containing "predictions" -> "points" list. |
|
Apply the affine transform matrix of georeferenced raster submitted to the machine learning model. |
|
|
|
Args: |
|
prediction_obj: input dict |
|
transform: 'rasterio.transform' or dict list, affine tranform matrix |
|
skip_conditions_list: dict list, skip condition list |
|
debug: bool, default=False. |
|
logging debug argument |
|
|
|
Returns: |
|
dict |
|
|
|
""" |
|
|
|
|
|
if skip_conditions_list is None: |
|
skip_conditions_list = SKIP_CONDITIONS_LIST |
|
|
|
|
|
app_logger.info(f"prediction_obj::{prediction_obj}, transform::{transform}.") |
|
crs = {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::3857"}} |
|
geojson_obj = {'features': [], 'type': 'FeatureCollection', "name": "geojson_name", "crs": crs} |
|
for n, prediction in enumerate(prediction_obj["predictions"]): |
|
points_dict_ = prediction["points"] |
|
points_list = [[p["x"], p["y"]] for p in points_dict_] |
|
app_logger.info(f"points_list::{points_list}.") |
|
|
|
|
|
feature = populate_features_geojson(n, points_list, confidence=prediction["confidence"], geomorphic_class=prediction["class"]) |
|
app_logger.info(f"geojson::feature:{feature}.") |
|
feature["geometry"] = apply_transform(feature["geometry"], transform, debug=debug) |
|
geojson_obj["features"].append(feature) |
|
app_logger.info(f"geojson::post_update:{geojson_obj}.") |
|
return geojson_obj |
|
|
|
|
|
def populate_features_geojson(idx: int, coordinates_list: list, **kwargs) -> ts_ddict2: |
|
""" |
|
Return a list of coordinate points in a geojson-like feature-like object. |
|
|
|
Args: |
|
idx: int, feature index |
|
coordinates_list: dict list, coordinate points |
|
**kwargs: optional arguments to merge within the geojson properties feature |
|
|
|
Returns: |
|
dict |
|
|
|
""" |
|
return { |
|
"type": "Feature", |
|
"properties": {"id": idx, **kwargs}, |
|
"geometry": { |
|
"type": "MultiPolygon", |
|
"coordinates": [[coordinates_list]], |
|
} |
|
} |
|
|
|
|
|
def check_skip_conditions(prediction:dict, skip_conditions_list:list, debug:bool=False) -> bool: |
|
""" |
|
Loop over elements within skip_condition_list and return a boolean if no condition to skip (or exceptions). |
|
|
|
Args: |
|
prediction: input dict to check |
|
skip_conditions_list: dict list with conditions to evaluate |
|
debug: bool, default=False |
|
logging debug argument |
|
|
|
Returns: |
|
bool |
|
|
|
""" |
|
for obj in skip_conditions_list: |
|
return skip_feature(prediction, obj["skip_key"], obj["skip_value"], obj["skip_condition"], debug=debug) |
|
return False |
|
|
|
|
|
def skip_feature(prediction:dict, skip_key:float, skip_value:str, skip_condition:str, debug:bool=False) -> bool: |
|
""" |
|
Return False if values from input dict shouldn't be skipped, |
|
True in case of exceptions, empty skip_condition or when chosen condition meets skip_value and skip_condition. |
|
|
|
E.g. confidence should be major than 0.8: if confidence is equal to 0.65 then return True (0.65 < 0.8) and skip! |
|
|
|
Args: |
|
prediction: input dict to check |
|
skip_key: skip condition key string |
|
skip_value: skip condition value string |
|
skip_condition: string (major | minor | equal) |
|
debug: bool, default=False |
|
logging debug argument |
|
|
|
Returns: |
|
bool |
|
|
|
""" |
|
|
|
|
|
try: |
|
v = prediction[skip_key] |
|
match skip_condition: |
|
case "major": |
|
return v > skip_value |
|
case "minor": |
|
return v < skip_value |
|
case "equal": |
|
return v == skip_value |
|
case "": |
|
return False |
|
except KeyError as ke_filter_feature: |
|
app_logger.error(f"ke_filter_feature:{ke_filter_feature}.") |
|
return False |
|
except Exception as e_filter_feature: |
|
app_logger.error(f"e_filter_feature:{e_filter_feature}.") |
|
return False |
|
|
|
|
|
def apply_transform(geometry:object, transform:list[object], debug:bool=False) -> dict: |
|
""" |
|
Returns a GeoJSON-like mapping from a transformed geometry using an affine transformation matrix. |
|
|
|
The coefficient matrix is provided as a list or tuple with 6 items |
|
for 2D transformations. The 6 parameter matrix is:: |
|
|
|
[a, b, d, e, xoff, yoff] |
|
|
|
which represents the augmented matrix:: |
|
|
|
[x'] / a b xoff \ [x] |
|
[y'] = | d e yoff | [y] |
|
[1 ] \ 0 0 1 / [1] |
|
|
|
or the equations for the transformed coordinates:: |
|
|
|
x' = a * x + b * y + xoff |
|
y' = d * x + e * y + yoff |
|
|
|
Args: |
|
geometry: geometry value from a geojson dict |
|
transform: list of float values (affine transformation matrix) |
|
debug: bool, default=False |
|
logging debug argument |
|
|
|
Returns: |
|
dict |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
try: |
|
from shapely.affinity import affine_transform |
|
from shapely.geometry import mapping, shape |
|
try: |
|
geometry_transformed = affine_transform(shape(geometry), [transform.a, transform.b, transform.d, transform.e, transform.xoff, transform.yoff]) |
|
except AttributeError as ae: |
|
app_logger.warning(f"ae:{ae}.") |
|
geometry_transformed = affine_transform(shape(geometry), [transform[0], transform[1], transform[2], transform[3], transform[4], transform[5]]) |
|
geometry_serialized = mapping(geometry_transformed) |
|
app_logger.info(f"geometry_serialized:{geometry_serialized}.") |
|
return geometry_serialized |
|
except ImportError as ie_apply_transform: |
|
app_logger.error(f"ie_apply_transform:{ie_apply_transform}.") |
|
raise ie_apply_transform |
|
except Exception as e_apply_transform: |
|
app_logger.error(f"e_apply_transform:{e_apply_transform}.") |
|
raise e_apply_transform |
|
|
|
|
|
def get_perc(nan_count:int, total_count:int) -> str: |
|
""" |
|
Return a formatted string with a percentage value representing the ratio between NaN and total number elements within a numpy array |
|
|
|
Args: |
|
nan_count: NaN value elements |
|
total_count: total count of elements |
|
|
|
Returns: |
|
str |
|
|
|
""" |
|
return f"{100*nan_count/total_count:.2f}" |
|
|
|
|
|
def json_unzip(j:dict, debug:bool=False) -> str: |
|
""" |
|
Return uncompressed content from input dict using 'zlib' library |
|
|
|
Args: |
|
j: input dict to uncompress. key must be 'base64(zip(o))' |
|
debug: logging debug argument |
|
|
|
Returns: |
|
dict: uncompressed dict |
|
|
|
""" |
|
from json import JSONDecodeError |
|
from zlib import error as zlib_error |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
j = zlib.decompress(base64.b64decode(j[ZIPJSON_KEY])) |
|
except KeyError as ke: |
|
ke_error_msg = f"Could not decode/unzip the content because of wrong/missing dict key:{ke}." |
|
raise KeyError(ke_error_msg) |
|
except zlib_error as zlib_error2: |
|
zlib_error2_msg = f"Could not decode/unzip the content because of:{zlib_error2}." |
|
app_logger.error(zlib_error2_msg) |
|
raise RuntimeError(zlib_error2_msg) |
|
|
|
try: |
|
j = json.loads(j) |
|
except JSONDecodeError as json_e1: |
|
msg = f"Could interpret the unzipped content because of JSONDecodeError with msg:{json_e1.msg}, pos:{json_e1.pos}, broken json:'{json_e1.doc}'" |
|
app_logger.error(msg) |
|
raise RuntimeError(msg) |
|
|
|
return j |
|
|
|
|
|
def json_zip(j:dict) -> dict[str]: |
|
""" |
|
Return compressed content from input dict using 'zlib' library |
|
|
|
Args: |
|
j: input dict to compress |
|
|
|
Returns: |
|
dict: compressed dict |
|
|
|
""" |
|
return { |
|
ZIPJSON_KEY: base64.b64encode( |
|
zlib.compress( |
|
json.dumps(j).encode('utf-8') |
|
) |
|
).decode('ascii') |
|
} |
|
|