alessandro trinca tornidor
[feat] add option to dump temp images during ML recognition operations
7d48fbe
"""helpers for computer vision duties""" | |
import numpy as np | |
from numpy import ndarray, bitwise_not | |
from rasterio import open as rasterio_open | |
from samgis_lisa_on_cuda import PROJECT_ROOT_FOLDER | |
from samgis_lisa_on_cuda import app_logger | |
from samgis_lisa_on_cuda.utilities.constants import OUTPUT_CRS_STRING | |
from samgis_lisa_on_cuda.utilities.type_hints import XYZTerrainProvidersNames | |
def get_nextzen_terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray: | |
""" | |
Compute a 32-bits 2d digital elevation model from a nextzen 'terrarium' (terrain-rgb) raster. | |
'Terrarium' format PNG tiles contain raw elevation data in meters, in Mercator projection (EPSG:3857). | |
All values are positive with a 32,768 offset, split into the red, green, and blue channels, | |
with 16 bits of integer and 8 bits of fraction. To decode: | |
(red * 256 + green + blue / 256) - 32768 | |
More details on https://www.mapzen.com/blog/elevation/ | |
Args: | |
red: red-valued channel image array | |
green: green-valued channel image array | |
blue: blue-valued channel image array | |
Returns: | |
ndarray: nextzen 'terrarium' 2d digital elevation model raster at 32 bits | |
""" | |
return (red * 256 + green + blue / 256) - 32768 | |
def get_mapbox__terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray: | |
return ((red * 256 * 256 + green * 256 + blue) * 0.1) - 10000 | |
providers_terrain_rgb_formulas = { | |
XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME: get_mapbox__terrain_rgb_formula, | |
XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME: get_nextzen_terrain_rgb_formula | |
} | |
def _get_2d_array_from_3d(arr: ndarray) -> ndarray: | |
return arr.reshape(arr.shape[0], arr.shape[1]) | |
def _channel_split(arr: ndarray) -> list[ndarray]: | |
from numpy import dsplit | |
return dsplit(arr, arr.shape[-1]) | |
def get_raster_terrain_rgb_like(arr: ndarray, xyz_provider_name, nan_value_int: int = -12000): | |
""" | |
Compute a 32-bits 2d digital elevation model from a terrain-rgb raster. | |
Args: | |
arr: rgb raster | |
xyz_provider_name: xyz provider | |
nan_value_int: threshold int value to replace NaN | |
Returns: | |
ndarray: 2d digital elevation model raster at 32 bits | |
""" | |
red, green, blue = _channel_split(arr) | |
dem_rgb = providers_terrain_rgb_formulas[xyz_provider_name](red, green, blue) | |
output = _get_2d_array_from_3d(dem_rgb) | |
output[output < nan_value_int] = np.NaN | |
return output | |
def get_rgb_prediction_image(raster_cropped: ndarray, slope_cellsize: int, invert_image: bool = True) -> ndarray: | |
""" | |
Return an RGB image from input numpy array | |
Args: | |
raster_cropped: input numpy array | |
slope_cellsize: window size to calculate slope and curvature (1st and 2nd degree array derivative) | |
invert_image: | |
Returns: | |
tuple of str: image filename, image path (with filename) | |
""" | |
from samgis_lisa_on_cuda.utilities.constants import CHANNEL_EXAGGERATIONS_LIST | |
try: | |
slope, curvature = get_slope_curvature(raster_cropped, slope_cellsize=slope_cellsize) | |
channel0 = raster_cropped | |
channel1 = normalize_array_list( | |
[raster_cropped, slope, curvature], CHANNEL_EXAGGERATIONS_LIST, title="channel1_normlist") | |
channel2 = curvature | |
return get_rgb_image(channel0, channel1, channel2, invert_image=invert_image) | |
except ValueError as ve_get_rgb_prediction_image: | |
msg = f"ve_get_rgb_prediction_image:{ve_get_rgb_prediction_image}." | |
app_logger.error(msg) | |
raise ve_get_rgb_prediction_image | |
def get_rgb_image(arr_channel0: ndarray, arr_channel1: ndarray, arr_channel2: ndarray, | |
invert_image: bool = True) -> ndarray: | |
""" | |
Return an RGB image from input R,G,B channel arrays | |
Args: | |
arr_channel0: channel image 0 | |
arr_channel1: channel image 1 | |
arr_channel2: channel image 2 | |
invert_image: invert the RGB image channel order | |
Returns: | |
ndarray: RGB image | |
""" | |
try: | |
# RED curvature, GREEN slope, BLUE dem, invert_image=True | |
if len(arr_channel0.shape) != 2: | |
msg = f"arr_size, wrong type:{type(arr_channel0)} or arr_size:{arr_channel0.shape}." | |
app_logger.error(msg) | |
raise ValueError(msg) | |
data_rgb = np.zeros((arr_channel0.shape[0], arr_channel0.shape[1], 3), dtype=np.uint8) | |
app_logger.debug(f"arr_container data_rgb, type:{type(data_rgb)}, arr_shape:{data_rgb.shape}.") | |
data_rgb[:, :, 0] = normalize_array( | |
arr_channel0.astype(float), high=1, norm_type="float", title="RGB:channel0") * 64 | |
data_rgb[:, :, 1] = normalize_array( | |
arr_channel1.astype(float), high=1, norm_type="float", title="RGB:channel1") * 128 | |
data_rgb[:, :, 2] = normalize_array( | |
arr_channel2.astype(float), high=1, norm_type="float", title="RGB:channel2") * 192 | |
if invert_image: | |
app_logger.debug(f"data_rgb:{type(data_rgb)}, {data_rgb.dtype}.") | |
data_rgb = bitwise_not(data_rgb) | |
return data_rgb | |
except ValueError as ve_get_rgb_image: | |
msg = f"ve_get_rgb_image:{ve_get_rgb_image}." | |
app_logger.error(msg) | |
raise ve_get_rgb_image | |
def get_slope_curvature(dem: ndarray, slope_cellsize: int, title: str = "") -> tuple[ndarray, ndarray]: | |
""" | |
Return a tuple of two numpy arrays representing slope and curvature (1st grade derivative and 2nd grade derivative) | |
Args: | |
dem: input numpy array | |
slope_cellsize: window size to calculate slope and curvature | |
title: array name | |
Returns: | |
tuple of ndarrays: slope image, curvature image | |
""" | |
app_logger.info(f"dem shape:{dem.shape}, slope_cellsize:{slope_cellsize}.") | |
try: | |
dem = dem.astype(float) | |
app_logger.debug("get_slope_curvature:: start") | |
slope = calculate_slope(dem, slope_cellsize) | |
app_logger.debug("get_slope_curvature:: created slope raster") | |
s2c = calculate_slope(slope, slope_cellsize) | |
curvature = normalize_array(s2c, norm_type="float", title=f"SC:curvature_{title}") | |
app_logger.debug("get_slope_curvature:: created curvature raster") | |
return slope, curvature | |
except ValueError as ve_get_slope_curvature: | |
msg = f"ve_get_slope_curvature:{ve_get_slope_curvature}." | |
app_logger.error(msg) | |
raise ve_get_slope_curvature | |
def calculate_slope(dem_array: ndarray, cell_size: int, calctype: str = "degree") -> ndarray: | |
""" | |
Return a numpy array representing slope (1st grade derivative) | |
Args: | |
dem_array: input numpy array | |
cell_size: window size to calculate slope | |
calctype: calculus type | |
Returns: | |
ndarray: slope image | |
""" | |
try: | |
gradx, grady = np.gradient(dem_array, cell_size) | |
dem_slope = np.sqrt(gradx ** 2 + grady ** 2) | |
if calctype == "degree": | |
dem_slope = np.degrees(np.arctan(dem_slope)) | |
app_logger.debug(f"extracted slope with calctype:{calctype}.") | |
return dem_slope | |
except ValueError as ve_calculate_slope: | |
msg = f"ve_calculate_slope:{ve_calculate_slope}." | |
app_logger.error(msg) | |
raise ve_calculate_slope | |
def normalize_array(arr: ndarray, high: int = 255, norm_type: str = "float", invert: bool = False, title: str = "") -> ndarray: | |
""" | |
Return normalized numpy array between 0 and 'high' value. Default normalization type is int | |
Args: | |
arr: input numpy array | |
high: max value to use for normalization | |
norm_type: type of normalization: could be 'float' or 'int' | |
invert: bool to choose if invert the normalized numpy array | |
title: array title name | |
Returns: | |
ndarray: normalized numpy array | |
""" | |
np.seterr("raise") | |
h_min_arr = np.nanmin(arr) | |
h_arr_max = np.nanmax(arr) | |
try: | |
h_diff = h_arr_max - h_min_arr | |
app_logger.debug( | |
f"normalize_array:: '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}.") | |
except Exception as e_h_diff: | |
app_logger.error(f"e_h_diff:{e_h_diff}.") | |
raise ValueError(e_h_diff) | |
if check_empty_array(arr, high) or check_empty_array(arr, h_diff): | |
msg_ve = f"normalize_array::empty array '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}." | |
app_logger.error(msg_ve) | |
raise ValueError(msg_ve) | |
try: | |
normalized = high * (arr - h_min_arr) / h_diff | |
normalized = np.nanmax(normalized) - normalized if invert else normalized | |
return normalized.astype(int) if norm_type == "int" else normalized | |
except FloatingPointError as fe: | |
msg = f"normalize_array::{title}:h_arr_max:{h_arr_max},h_min_arr:{h_min_arr},fe:{fe}." | |
app_logger.error(msg) | |
raise ValueError(msg) | |
def normalize_array_list(arr_list: list[ndarray], exaggerations_list: list[float] = None, title: str = "") -> ndarray: | |
""" | |
Return a normalized numpy array from a list of numpy array and an optional list of exaggeration values. | |
Args: | |
arr_list: list of array to use for normalization | |
exaggerations_list: list of exaggeration values | |
title: array title name | |
Returns: | |
ndarray: normalized numpy array | |
""" | |
if not arr_list: | |
msg = f"input list can't be empty:{arr_list}." | |
app_logger.error(msg) | |
raise ValueError(msg) | |
if exaggerations_list is None: | |
exaggerations_list = list(np.ones(len(arr_list))) | |
arr_tmp = np.zeros(arr_list[0].shape) | |
for a, exaggeration in zip(arr_list, exaggerations_list): | |
app_logger.debug(f"normalize_array_list::exaggeration:{exaggeration}.") | |
arr_tmp += normalize_array(a, norm_type="float", title=f"ARRLIST:{title}.") * exaggeration | |
return arr_tmp / len(arr_list) | |
def check_empty_array(arr: ndarray, val: float) -> bool: | |
""" | |
Return True if the input numpy array is empy. Check if | |
- all values are all the same value (0, 1 or given 'val' input float value) | |
- all values that are not NaN are a given 'val' float value | |
Args: | |
arr: input numpy array | |
val: value to use for check if array is empty | |
Returns: | |
bool: True if the input numpy array is empty, False otherwise | |
""" | |
arr_check5_tmp = np.copy(arr) | |
arr_size = arr.shape[0] | |
arr_check3 = np.ones((arr_size, arr_size)) | |
check1 = np.array_equal(arr, arr_check3) | |
check2 = np.array_equal(arr, np.zeros((arr_size, arr_size))) | |
arr_check3 *= val | |
check3 = np.array_equal(arr, arr_check3) | |
arr[np.isnan(arr)] = 0 | |
check4 = np.array_equal(arr, np.zeros((arr_size, arr_size))) | |
arr_check5 = np.ones((arr_size, arr_size)) * val | |
arr_check5_tmp[np.isnan(arr_check5_tmp)] = val | |
check5 = np.array_equal(arr_check5_tmp, arr_check5) | |
app_logger.debug(f"array checks:{check1}, {check2}, {check3}, {check4}, {check5}.") | |
return check1 or check2 or check3 or check4 or check5 | |
def write_raster_png(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"): | |
from pathlib import Path | |
from rasterio.plot import reshape_as_raster | |
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.png" | |
with rasterio_open( | |
output_filename, 'w', driver='PNG', | |
height=arr.shape[0], | |
width=arr.shape[1], | |
count=3, | |
dtype=str(arr.dtype), | |
crs=OUTPUT_CRS_STRING, | |
transform=transform) as dst: | |
dst.write(reshape_as_raster(arr)) | |
app_logger.info(f"written:{output_filename} as PNG, use {OUTPUT_CRS_STRING} as CRS.") | |
def write_raster_tiff(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"): | |
from pathlib import Path | |
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.tiff" | |
with rasterio_open( | |
output_filename, 'w', driver='GTiff', | |
height=arr.shape[0], | |
width=arr.shape[1], | |
count=1, | |
dtype=str(arr.dtype), | |
crs=OUTPUT_CRS_STRING, | |
transform=transform) as dst: | |
dst.write(arr, 1) | |
app_logger.info(f"written:{output_filename} as TIFF, use {OUTPUT_CRS_STRING} as CRS.") | |