alessandro trinca tornidor
[feat] add option to dump temp images during ML recognition operations
7d48fbe
raw
history blame
12.4 kB
"""helpers for computer vision duties"""
import numpy as np
from numpy import ndarray, bitwise_not
from rasterio import open as rasterio_open
from samgis_lisa_on_cuda import PROJECT_ROOT_FOLDER
from samgis_lisa_on_cuda import app_logger
from samgis_lisa_on_cuda.utilities.constants import OUTPUT_CRS_STRING
from samgis_lisa_on_cuda.utilities.type_hints import XYZTerrainProvidersNames
def get_nextzen_terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray:
"""
Compute a 32-bits 2d digital elevation model from a nextzen 'terrarium' (terrain-rgb) raster.
'Terrarium' format PNG tiles contain raw elevation data in meters, in Mercator projection (EPSG:3857).
All values are positive with a 32,768 offset, split into the red, green, and blue channels,
with 16 bits of integer and 8 bits of fraction. To decode:
(red * 256 + green + blue / 256) - 32768
More details on https://www.mapzen.com/blog/elevation/
Args:
red: red-valued channel image array
green: green-valued channel image array
blue: blue-valued channel image array
Returns:
ndarray: nextzen 'terrarium' 2d digital elevation model raster at 32 bits
"""
return (red * 256 + green + blue / 256) - 32768
def get_mapbox__terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray:
return ((red * 256 * 256 + green * 256 + blue) * 0.1) - 10000
providers_terrain_rgb_formulas = {
XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME: get_mapbox__terrain_rgb_formula,
XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME: get_nextzen_terrain_rgb_formula
}
def _get_2d_array_from_3d(arr: ndarray) -> ndarray:
return arr.reshape(arr.shape[0], arr.shape[1])
def _channel_split(arr: ndarray) -> list[ndarray]:
from numpy import dsplit
return dsplit(arr, arr.shape[-1])
def get_raster_terrain_rgb_like(arr: ndarray, xyz_provider_name, nan_value_int: int = -12000):
"""
Compute a 32-bits 2d digital elevation model from a terrain-rgb raster.
Args:
arr: rgb raster
xyz_provider_name: xyz provider
nan_value_int: threshold int value to replace NaN
Returns:
ndarray: 2d digital elevation model raster at 32 bits
"""
red, green, blue = _channel_split(arr)
dem_rgb = providers_terrain_rgb_formulas[xyz_provider_name](red, green, blue)
output = _get_2d_array_from_3d(dem_rgb)
output[output < nan_value_int] = np.NaN
return output
def get_rgb_prediction_image(raster_cropped: ndarray, slope_cellsize: int, invert_image: bool = True) -> ndarray:
"""
Return an RGB image from input numpy array
Args:
raster_cropped: input numpy array
slope_cellsize: window size to calculate slope and curvature (1st and 2nd degree array derivative)
invert_image:
Returns:
tuple of str: image filename, image path (with filename)
"""
from samgis_lisa_on_cuda.utilities.constants import CHANNEL_EXAGGERATIONS_LIST
try:
slope, curvature = get_slope_curvature(raster_cropped, slope_cellsize=slope_cellsize)
channel0 = raster_cropped
channel1 = normalize_array_list(
[raster_cropped, slope, curvature], CHANNEL_EXAGGERATIONS_LIST, title="channel1_normlist")
channel2 = curvature
return get_rgb_image(channel0, channel1, channel2, invert_image=invert_image)
except ValueError as ve_get_rgb_prediction_image:
msg = f"ve_get_rgb_prediction_image:{ve_get_rgb_prediction_image}."
app_logger.error(msg)
raise ve_get_rgb_prediction_image
def get_rgb_image(arr_channel0: ndarray, arr_channel1: ndarray, arr_channel2: ndarray,
invert_image: bool = True) -> ndarray:
"""
Return an RGB image from input R,G,B channel arrays
Args:
arr_channel0: channel image 0
arr_channel1: channel image 1
arr_channel2: channel image 2
invert_image: invert the RGB image channel order
Returns:
ndarray: RGB image
"""
try:
# RED curvature, GREEN slope, BLUE dem, invert_image=True
if len(arr_channel0.shape) != 2:
msg = f"arr_size, wrong type:{type(arr_channel0)} or arr_size:{arr_channel0.shape}."
app_logger.error(msg)
raise ValueError(msg)
data_rgb = np.zeros((arr_channel0.shape[0], arr_channel0.shape[1], 3), dtype=np.uint8)
app_logger.debug(f"arr_container data_rgb, type:{type(data_rgb)}, arr_shape:{data_rgb.shape}.")
data_rgb[:, :, 0] = normalize_array(
arr_channel0.astype(float), high=1, norm_type="float", title="RGB:channel0") * 64
data_rgb[:, :, 1] = normalize_array(
arr_channel1.astype(float), high=1, norm_type="float", title="RGB:channel1") * 128
data_rgb[:, :, 2] = normalize_array(
arr_channel2.astype(float), high=1, norm_type="float", title="RGB:channel2") * 192
if invert_image:
app_logger.debug(f"data_rgb:{type(data_rgb)}, {data_rgb.dtype}.")
data_rgb = bitwise_not(data_rgb)
return data_rgb
except ValueError as ve_get_rgb_image:
msg = f"ve_get_rgb_image:{ve_get_rgb_image}."
app_logger.error(msg)
raise ve_get_rgb_image
def get_slope_curvature(dem: ndarray, slope_cellsize: int, title: str = "") -> tuple[ndarray, ndarray]:
"""
Return a tuple of two numpy arrays representing slope and curvature (1st grade derivative and 2nd grade derivative)
Args:
dem: input numpy array
slope_cellsize: window size to calculate slope and curvature
title: array name
Returns:
tuple of ndarrays: slope image, curvature image
"""
app_logger.info(f"dem shape:{dem.shape}, slope_cellsize:{slope_cellsize}.")
try:
dem = dem.astype(float)
app_logger.debug("get_slope_curvature:: start")
slope = calculate_slope(dem, slope_cellsize)
app_logger.debug("get_slope_curvature:: created slope raster")
s2c = calculate_slope(slope, slope_cellsize)
curvature = normalize_array(s2c, norm_type="float", title=f"SC:curvature_{title}")
app_logger.debug("get_slope_curvature:: created curvature raster")
return slope, curvature
except ValueError as ve_get_slope_curvature:
msg = f"ve_get_slope_curvature:{ve_get_slope_curvature}."
app_logger.error(msg)
raise ve_get_slope_curvature
def calculate_slope(dem_array: ndarray, cell_size: int, calctype: str = "degree") -> ndarray:
"""
Return a numpy array representing slope (1st grade derivative)
Args:
dem_array: input numpy array
cell_size: window size to calculate slope
calctype: calculus type
Returns:
ndarray: slope image
"""
try:
gradx, grady = np.gradient(dem_array, cell_size)
dem_slope = np.sqrt(gradx ** 2 + grady ** 2)
if calctype == "degree":
dem_slope = np.degrees(np.arctan(dem_slope))
app_logger.debug(f"extracted slope with calctype:{calctype}.")
return dem_slope
except ValueError as ve_calculate_slope:
msg = f"ve_calculate_slope:{ve_calculate_slope}."
app_logger.error(msg)
raise ve_calculate_slope
def normalize_array(arr: ndarray, high: int = 255, norm_type: str = "float", invert: bool = False, title: str = "") -> ndarray:
"""
Return normalized numpy array between 0 and 'high' value. Default normalization type is int
Args:
arr: input numpy array
high: max value to use for normalization
norm_type: type of normalization: could be 'float' or 'int'
invert: bool to choose if invert the normalized numpy array
title: array title name
Returns:
ndarray: normalized numpy array
"""
np.seterr("raise")
h_min_arr = np.nanmin(arr)
h_arr_max = np.nanmax(arr)
try:
h_diff = h_arr_max - h_min_arr
app_logger.debug(
f"normalize_array:: '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}.")
except Exception as e_h_diff:
app_logger.error(f"e_h_diff:{e_h_diff}.")
raise ValueError(e_h_diff)
if check_empty_array(arr, high) or check_empty_array(arr, h_diff):
msg_ve = f"normalize_array::empty array '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}."
app_logger.error(msg_ve)
raise ValueError(msg_ve)
try:
normalized = high * (arr - h_min_arr) / h_diff
normalized = np.nanmax(normalized) - normalized if invert else normalized
return normalized.astype(int) if norm_type == "int" else normalized
except FloatingPointError as fe:
msg = f"normalize_array::{title}:h_arr_max:{h_arr_max},h_min_arr:{h_min_arr},fe:{fe}."
app_logger.error(msg)
raise ValueError(msg)
def normalize_array_list(arr_list: list[ndarray], exaggerations_list: list[float] = None, title: str = "") -> ndarray:
"""
Return a normalized numpy array from a list of numpy array and an optional list of exaggeration values.
Args:
arr_list: list of array to use for normalization
exaggerations_list: list of exaggeration values
title: array title name
Returns:
ndarray: normalized numpy array
"""
if not arr_list:
msg = f"input list can't be empty:{arr_list}."
app_logger.error(msg)
raise ValueError(msg)
if exaggerations_list is None:
exaggerations_list = list(np.ones(len(arr_list)))
arr_tmp = np.zeros(arr_list[0].shape)
for a, exaggeration in zip(arr_list, exaggerations_list):
app_logger.debug(f"normalize_array_list::exaggeration:{exaggeration}.")
arr_tmp += normalize_array(a, norm_type="float", title=f"ARRLIST:{title}.") * exaggeration
return arr_tmp / len(arr_list)
def check_empty_array(arr: ndarray, val: float) -> bool:
"""
Return True if the input numpy array is empy. Check if
- all values are all the same value (0, 1 or given 'val' input float value)
- all values that are not NaN are a given 'val' float value
Args:
arr: input numpy array
val: value to use for check if array is empty
Returns:
bool: True if the input numpy array is empty, False otherwise
"""
arr_check5_tmp = np.copy(arr)
arr_size = arr.shape[0]
arr_check3 = np.ones((arr_size, arr_size))
check1 = np.array_equal(arr, arr_check3)
check2 = np.array_equal(arr, np.zeros((arr_size, arr_size)))
arr_check3 *= val
check3 = np.array_equal(arr, arr_check3)
arr[np.isnan(arr)] = 0
check4 = np.array_equal(arr, np.zeros((arr_size, arr_size)))
arr_check5 = np.ones((arr_size, arr_size)) * val
arr_check5_tmp[np.isnan(arr_check5_tmp)] = val
check5 = np.array_equal(arr_check5_tmp, arr_check5)
app_logger.debug(f"array checks:{check1}, {check2}, {check3}, {check4}, {check5}.")
return check1 or check2 or check3 or check4 or check5
def write_raster_png(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"):
from pathlib import Path
from rasterio.plot import reshape_as_raster
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.png"
with rasterio_open(
output_filename, 'w', driver='PNG',
height=arr.shape[0],
width=arr.shape[1],
count=3,
dtype=str(arr.dtype),
crs=OUTPUT_CRS_STRING,
transform=transform) as dst:
dst.write(reshape_as_raster(arr))
app_logger.info(f"written:{output_filename} as PNG, use {OUTPUT_CRS_STRING} as CRS.")
def write_raster_tiff(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"):
from pathlib import Path
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.tiff"
with rasterio_open(
output_filename, 'w', driver='GTiff',
height=arr.shape[0],
width=arr.shape[1],
count=1,
dtype=str(arr.dtype),
crs=OUTPUT_CRS_STRING,
transform=transform) as dst:
dst.write(arr, 1)
app_logger.info(f"written:{output_filename} as TIFF, use {OUTPUT_CRS_STRING} as CRS.")