File size: 12,360 Bytes
0914710 7d48fbe 0914710 7d48fbe fcb8c81 7d48fbe fcb8c81 0914710 fcb8c81 0914710 7d48fbe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
"""helpers for computer vision duties"""
import numpy as np
from numpy import ndarray, bitwise_not
from rasterio import open as rasterio_open
from samgis_lisa_on_cuda import PROJECT_ROOT_FOLDER
from samgis_lisa_on_cuda import app_logger
from samgis_lisa_on_cuda.utilities.constants import OUTPUT_CRS_STRING
from samgis_lisa_on_cuda.utilities.type_hints import XYZTerrainProvidersNames
def get_nextzen_terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray:
"""
Compute a 32-bits 2d digital elevation model from a nextzen 'terrarium' (terrain-rgb) raster.
'Terrarium' format PNG tiles contain raw elevation data in meters, in Mercator projection (EPSG:3857).
All values are positive with a 32,768 offset, split into the red, green, and blue channels,
with 16 bits of integer and 8 bits of fraction. To decode:
(red * 256 + green + blue / 256) - 32768
More details on https://www.mapzen.com/blog/elevation/
Args:
red: red-valued channel image array
green: green-valued channel image array
blue: blue-valued channel image array
Returns:
ndarray: nextzen 'terrarium' 2d digital elevation model raster at 32 bits
"""
return (red * 256 + green + blue / 256) - 32768
def get_mapbox__terrain_rgb_formula(red: ndarray, green: ndarray, blue: ndarray) -> ndarray:
return ((red * 256 * 256 + green * 256 + blue) * 0.1) - 10000
providers_terrain_rgb_formulas = {
XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME: get_mapbox__terrain_rgb_formula,
XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME: get_nextzen_terrain_rgb_formula
}
def _get_2d_array_from_3d(arr: ndarray) -> ndarray:
return arr.reshape(arr.shape[0], arr.shape[1])
def _channel_split(arr: ndarray) -> list[ndarray]:
from numpy import dsplit
return dsplit(arr, arr.shape[-1])
def get_raster_terrain_rgb_like(arr: ndarray, xyz_provider_name, nan_value_int: int = -12000):
"""
Compute a 32-bits 2d digital elevation model from a terrain-rgb raster.
Args:
arr: rgb raster
xyz_provider_name: xyz provider
nan_value_int: threshold int value to replace NaN
Returns:
ndarray: 2d digital elevation model raster at 32 bits
"""
red, green, blue = _channel_split(arr)
dem_rgb = providers_terrain_rgb_formulas[xyz_provider_name](red, green, blue)
output = _get_2d_array_from_3d(dem_rgb)
output[output < nan_value_int] = np.NaN
return output
def get_rgb_prediction_image(raster_cropped: ndarray, slope_cellsize: int, invert_image: bool = True) -> ndarray:
"""
Return an RGB image from input numpy array
Args:
raster_cropped: input numpy array
slope_cellsize: window size to calculate slope and curvature (1st and 2nd degree array derivative)
invert_image:
Returns:
tuple of str: image filename, image path (with filename)
"""
from samgis_lisa_on_cuda.utilities.constants import CHANNEL_EXAGGERATIONS_LIST
try:
slope, curvature = get_slope_curvature(raster_cropped, slope_cellsize=slope_cellsize)
channel0 = raster_cropped
channel1 = normalize_array_list(
[raster_cropped, slope, curvature], CHANNEL_EXAGGERATIONS_LIST, title="channel1_normlist")
channel2 = curvature
return get_rgb_image(channel0, channel1, channel2, invert_image=invert_image)
except ValueError as ve_get_rgb_prediction_image:
msg = f"ve_get_rgb_prediction_image:{ve_get_rgb_prediction_image}."
app_logger.error(msg)
raise ve_get_rgb_prediction_image
def get_rgb_image(arr_channel0: ndarray, arr_channel1: ndarray, arr_channel2: ndarray,
invert_image: bool = True) -> ndarray:
"""
Return an RGB image from input R,G,B channel arrays
Args:
arr_channel0: channel image 0
arr_channel1: channel image 1
arr_channel2: channel image 2
invert_image: invert the RGB image channel order
Returns:
ndarray: RGB image
"""
try:
# RED curvature, GREEN slope, BLUE dem, invert_image=True
if len(arr_channel0.shape) != 2:
msg = f"arr_size, wrong type:{type(arr_channel0)} or arr_size:{arr_channel0.shape}."
app_logger.error(msg)
raise ValueError(msg)
data_rgb = np.zeros((arr_channel0.shape[0], arr_channel0.shape[1], 3), dtype=np.uint8)
app_logger.debug(f"arr_container data_rgb, type:{type(data_rgb)}, arr_shape:{data_rgb.shape}.")
data_rgb[:, :, 0] = normalize_array(
arr_channel0.astype(float), high=1, norm_type="float", title="RGB:channel0") * 64
data_rgb[:, :, 1] = normalize_array(
arr_channel1.astype(float), high=1, norm_type="float", title="RGB:channel1") * 128
data_rgb[:, :, 2] = normalize_array(
arr_channel2.astype(float), high=1, norm_type="float", title="RGB:channel2") * 192
if invert_image:
app_logger.debug(f"data_rgb:{type(data_rgb)}, {data_rgb.dtype}.")
data_rgb = bitwise_not(data_rgb)
return data_rgb
except ValueError as ve_get_rgb_image:
msg = f"ve_get_rgb_image:{ve_get_rgb_image}."
app_logger.error(msg)
raise ve_get_rgb_image
def get_slope_curvature(dem: ndarray, slope_cellsize: int, title: str = "") -> tuple[ndarray, ndarray]:
"""
Return a tuple of two numpy arrays representing slope and curvature (1st grade derivative and 2nd grade derivative)
Args:
dem: input numpy array
slope_cellsize: window size to calculate slope and curvature
title: array name
Returns:
tuple of ndarrays: slope image, curvature image
"""
app_logger.info(f"dem shape:{dem.shape}, slope_cellsize:{slope_cellsize}.")
try:
dem = dem.astype(float)
app_logger.debug("get_slope_curvature:: start")
slope = calculate_slope(dem, slope_cellsize)
app_logger.debug("get_slope_curvature:: created slope raster")
s2c = calculate_slope(slope, slope_cellsize)
curvature = normalize_array(s2c, norm_type="float", title=f"SC:curvature_{title}")
app_logger.debug("get_slope_curvature:: created curvature raster")
return slope, curvature
except ValueError as ve_get_slope_curvature:
msg = f"ve_get_slope_curvature:{ve_get_slope_curvature}."
app_logger.error(msg)
raise ve_get_slope_curvature
def calculate_slope(dem_array: ndarray, cell_size: int, calctype: str = "degree") -> ndarray:
"""
Return a numpy array representing slope (1st grade derivative)
Args:
dem_array: input numpy array
cell_size: window size to calculate slope
calctype: calculus type
Returns:
ndarray: slope image
"""
try:
gradx, grady = np.gradient(dem_array, cell_size)
dem_slope = np.sqrt(gradx ** 2 + grady ** 2)
if calctype == "degree":
dem_slope = np.degrees(np.arctan(dem_slope))
app_logger.debug(f"extracted slope with calctype:{calctype}.")
return dem_slope
except ValueError as ve_calculate_slope:
msg = f"ve_calculate_slope:{ve_calculate_slope}."
app_logger.error(msg)
raise ve_calculate_slope
def normalize_array(arr: ndarray, high: int = 255, norm_type: str = "float", invert: bool = False, title: str = "") -> ndarray:
"""
Return normalized numpy array between 0 and 'high' value. Default normalization type is int
Args:
arr: input numpy array
high: max value to use for normalization
norm_type: type of normalization: could be 'float' or 'int'
invert: bool to choose if invert the normalized numpy array
title: array title name
Returns:
ndarray: normalized numpy array
"""
np.seterr("raise")
h_min_arr = np.nanmin(arr)
h_arr_max = np.nanmax(arr)
try:
h_diff = h_arr_max - h_min_arr
app_logger.debug(
f"normalize_array:: '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}.")
except Exception as e_h_diff:
app_logger.error(f"e_h_diff:{e_h_diff}.")
raise ValueError(e_h_diff)
if check_empty_array(arr, high) or check_empty_array(arr, h_diff):
msg_ve = f"normalize_array::empty array '{title}',h_min_arr:{h_min_arr},h_arr_max:{h_arr_max},h_diff:{h_diff}, dtype:{arr.dtype}."
app_logger.error(msg_ve)
raise ValueError(msg_ve)
try:
normalized = high * (arr - h_min_arr) / h_diff
normalized = np.nanmax(normalized) - normalized if invert else normalized
return normalized.astype(int) if norm_type == "int" else normalized
except FloatingPointError as fe:
msg = f"normalize_array::{title}:h_arr_max:{h_arr_max},h_min_arr:{h_min_arr},fe:{fe}."
app_logger.error(msg)
raise ValueError(msg)
def normalize_array_list(arr_list: list[ndarray], exaggerations_list: list[float] = None, title: str = "") -> ndarray:
"""
Return a normalized numpy array from a list of numpy array and an optional list of exaggeration values.
Args:
arr_list: list of array to use for normalization
exaggerations_list: list of exaggeration values
title: array title name
Returns:
ndarray: normalized numpy array
"""
if not arr_list:
msg = f"input list can't be empty:{arr_list}."
app_logger.error(msg)
raise ValueError(msg)
if exaggerations_list is None:
exaggerations_list = list(np.ones(len(arr_list)))
arr_tmp = np.zeros(arr_list[0].shape)
for a, exaggeration in zip(arr_list, exaggerations_list):
app_logger.debug(f"normalize_array_list::exaggeration:{exaggeration}.")
arr_tmp += normalize_array(a, norm_type="float", title=f"ARRLIST:{title}.") * exaggeration
return arr_tmp / len(arr_list)
def check_empty_array(arr: ndarray, val: float) -> bool:
"""
Return True if the input numpy array is empy. Check if
- all values are all the same value (0, 1 or given 'val' input float value)
- all values that are not NaN are a given 'val' float value
Args:
arr: input numpy array
val: value to use for check if array is empty
Returns:
bool: True if the input numpy array is empty, False otherwise
"""
arr_check5_tmp = np.copy(arr)
arr_size = arr.shape[0]
arr_check3 = np.ones((arr_size, arr_size))
check1 = np.array_equal(arr, arr_check3)
check2 = np.array_equal(arr, np.zeros((arr_size, arr_size)))
arr_check3 *= val
check3 = np.array_equal(arr, arr_check3)
arr[np.isnan(arr)] = 0
check4 = np.array_equal(arr, np.zeros((arr_size, arr_size)))
arr_check5 = np.ones((arr_size, arr_size)) * val
arr_check5_tmp[np.isnan(arr_check5_tmp)] = val
check5 = np.array_equal(arr_check5_tmp, arr_check5)
app_logger.debug(f"array checks:{check1}, {check2}, {check3}, {check4}, {check5}.")
return check1 or check2 or check3 or check4 or check5
def write_raster_png(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"):
from pathlib import Path
from rasterio.plot import reshape_as_raster
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.png"
with rasterio_open(
output_filename, 'w', driver='PNG',
height=arr.shape[0],
width=arr.shape[1],
count=3,
dtype=str(arr.dtype),
crs=OUTPUT_CRS_STRING,
transform=transform) as dst:
dst.write(reshape_as_raster(arr))
app_logger.info(f"written:{output_filename} as PNG, use {OUTPUT_CRS_STRING} as CRS.")
def write_raster_tiff(arr, transform, prefix: str, suffix: str, folder_output_path="/tmp"):
from pathlib import Path
output_filename = Path(folder_output_path) / f"{prefix}_{suffix}.tiff"
with rasterio_open(
output_filename, 'w', driver='GTiff',
height=arr.shape[0],
width=arr.shape[1],
count=1,
dtype=str(arr.dtype),
crs=OUTPUT_CRS_STRING,
transform=transform) as dst:
dst.write(arr, 1)
app_logger.info(f"written:{output_filename} as TIFF, use {OUTPUT_CRS_STRING} as CRS.")
|