Ignaciobfp's picture
Update app.py
97bb9d9
from huggingface_hub import from_pretrained_fastai
import gradio as gr
from fastai.vision.all import *
import PIL
import torchvision.transforms as transforms
##Extras por si pudiera reconstruir la imagen en HF tambi茅n
import numpy as np
import os
import cv2
def extract_subimages(image : np.ndarray, wwidth, wheight, overlap_fraction):
"""
Extracts subimages of the input image using a moving window of size (wwidth, wheight)
with the specified overlap fraction. Returns a tuple (subimages, coords) where subimages
is a list of subimages and coords is a list of tuples (x, y) indicating the top left corner
coordinates of each subimage in the input image.
"""
subimages = []
coords = []
height, width, channels = image.shape
if channels > 3:
image = image[:,:,0:3]
channels = 3
overlap = int(max(0, min(overlap_fraction, 1)) * min(wwidth, wheight))
y = 0
while y + wheight <= height:
x = 0
while x + wwidth <= width:
subimage = image[y:y+wheight, x:x+wwidth, :]
subimages.append(subimage)
coords.append((x, y))
x += wwidth - overlap
y += wheight - overlap
if y < height:
y = height - wheight
x = 0
while x + wwidth <= width:
subimage = image[y:y+wheight, x:x+wwidth, :]
subimages.append(subimage)
coords.append((x, y))
x += wwidth - overlap
if x < width:
x = width - wwidth
subimage = image[y:y+wheight, x:x+wwidth, :]
subimages.append(subimage)
coords.append((x, y))
if x < width:
x = width - wwidth
y = 0
while y + wheight <= height:
subimage = image[y:y+wheight, x:x+wwidth, :]
subimages.append(subimage)
coords.append((x, y))
y += wheight - overlap
if y < height:
y = height - wheight
subimage = image[y:y+wheight, x:x+wwidth, :]
subimages.append(subimage)
coords.append((x, y))
return subimages, coords
# Si no hay archivos tif (labels) no se tratan, no hace falta considerarlo
def generate_and_save_subimages(path, output_dir_images, output_dir_labels = None):
if output_dir_labels:
if not os.path.exists(output_dir_labels):
os.makedirs(output_dir_labels)
if not os.path.exists(output_dir_images):
os.makedirs(output_dir_images)
for filename in os.listdir(path):
if filename.endswith(".png") or filename.endswith(".tif"):
filepath = os.path.join(path, filename)
image = cv2.imread(filepath)
subimages, coords = extract_subimages(image, 400, 400, 0.66)
for i, subimage in enumerate(subimages):
if filename.endswith(".png"):
output_filename = os.path.join(output_dir_images, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png")
cv2.imwrite(output_filename, subimage)
else:
if output_dir_labels:
output_filename = os.path.join(output_dir_labels, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.tif")
cv2.imwrite(output_filename, subimage)
def generate_and_save_subimages_nolabel(path, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True):
for entry in os.scandir(path):
if entry.is_file() and entry.name.lower().endswith(imagesformat):
filepath = entry.path
gss_single(filepath, output_dir_images, olverlap, imagesformat, split_in_dirs)
def gss_single(filepath, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True):
image = cv2.imread(filepath)
if split_in_dirs:
dir_this_image = Path(output_dir_images)/filepath.rsplit('.', 1)[0]
os.makedirs(dir_this_image, exist_ok=True)
else:
os.makedirs(output_dir_images, exist_ok=True)
subimages, coords = extract_subimages(image, 400, 400, olverlap)
for i, subimage in enumerate(subimages):
if split_in_dirs:
output_filename = os.path.join(dir_this_image, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png")
else:
output_filename = os.path.join(output_dir_images, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png")
cv2.imwrite(output_filename, subimage)
def split_windows_in_folders(input_images_folder, output_images_folder):
for filename in os.listdir(input_images_folder):
dir_this_image = Path(output_images_folder)/filename.rsplit('.', 1)[0]
os.makedirs(dir_this_image, exist_ok=True)
if filename.endswith(".png"):
print(str(dir_this_image))
filepath = os.path.join(path, filename)
image = cv2.imread(filepath)
subimages, coords = extract_subimages(image, 400, 400, 0)
for i, subimage in enumerate(subimages):
output_filename = os.path.join(dir_this_image, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png")
cv2.imwrite(output_filename, subimage)
def subimages_from_directory(directorio):
# Define el directorio a recorrer
directorio = directorio
# Define la expresi贸n regular para buscar los n煤meros X e Y en el nombre de archivo
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)")
windowlist = []
coords = []
# Recorre el directorio en busca de im谩genes
for filename in os.listdir(directorio):
match = patron.search(filename)
if match:
origname = match.group(1)
x = int(match.group(2))
y = int(match.group(3))
#print(f"El archivo {filename} tiene los n煤meros X={x} e Y={y}")
img = cv2.imread(os.path.join(directorio, filename))
windowlist.append(img)
coords.append((x, y))
# Ordena las listas por coordenadas X e Y
windowlist, coords = zip(*sorted(zip(windowlist, coords), key=lambda pair: (pair[1][0], pair[1][1])))
wh, ww, chan = windowlist[0].shape
origsize = tuple(elem1 + elem2 for elem1, elem2 in zip(coords[-1], (wh,ww)))
return windowlist, coords, wh, ww, chan, origsize
def subimages_onlypath(directorio):
# Define el directorio a recorrer
directorio = directorio
pathlist = []
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)")
for filename in os.listdir(directorio):
match = patron.search(filename)
if match:
pathlist.append(os.path.join(directorio, filename))
return pathlist
def ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize):
canvas = np.zeros((origsize[1], origsize[0], chan), dtype=np.uint8)
for idx, window in enumerate(windowlist):
canvas[coords[idx][1]:coords[idx][1]+wh, coords[idx][0]:coords[idx][0]+ww, :] = window
return canvas
def get_list_tp(path):
list_to_process = [] # Inicializar la lista que contendr谩 los nombres de los subdirectorios
list_names = []
# Recorrer los elementos del directorio
for element in os.scandir(path):
# Verificar si el elemento es un directorio
if element.is_dir():
# Agregar el nombre del subdirectorio a la lista
windowlist, coords, wh, ww, chan, origsize = subimages_from_directory(element)
list_to_process.append(ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize))
list_names.append(element.name)
return list_to_process, list_names
def get_paths_tp(path):
list_to_process = [] # Inicializar la lista que contendr谩 los nombres de los subdirectorios
# Recorrer los elementos del directorio
for element in os.scandir(path):
# Verificar si el elemento es un directorio
if element.is_dir():
# Agregar el nombre del subdirectorio a la lista
list_to_process.append(subimages_onlypath(element))
return list_to_process
def process_multifolder(process_folders, result_folder):
for folder in process_folders:
folname = os.path.basename(os.path.dirname(folder[0]))
destname = Path(result_folder)/folname
os.makedirs(destname, exist_ok=True)
for subimagepath in folder:
img = PIL.Image.open(subimagepath)
image = transforms.Resize((400,400))(img)
tensor = transform_image(image=image)
with torch.no_grad():
outputs = model(tensor)
outputs = torch.argmax(outputs,1)
mask = np.array(outputs.cpu())
mask[mask==1]=255
mask=np.reshape(mask,(400,400))
mask_img = Image.fromarray(mask.astype('uint8'))
filename = os.path.basename(subimagepath)
new_image_path = os.path.join(result_folder, folname, filename)
mask_img.save(new_image_path)
def recombine_windows(results_folder_w, result_f_rec):
imgs, nombres = get_list_tp(results_folder_w)
os.makedirs(result_f_rec, exist_ok=True)
for idx, image in enumerate(imgs):
img = Image.fromarray(image)
new_image_path = os.path.join(result_f_rec, nombres[idx] + '.tif')
img.save(new_image_path, compression='tiff_lzw')
return new_image_path
def process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f):
gss_single(single_image_path, pro_f, 0, "tif", True)
process_multifolder(get_paths_tp(pro_f),rsw_f)
pt = recombine_windows(rsw_f,rsd_f)
shutil.rmtree(pro_f)
shutil.rmtree(rsw_f)
#copiar_info_georref(single_image_path, pt)
return pt
# from osgeo import gdal, osr
# def copiar_info_georref(entrada, salida):
# try:
# # Abrir el archivo GeoTIFF original
# original_dataset = gdal.Open(entrada)
# # Obtener la informaci贸n de georreferenciaci贸n del archivo original
# original_projection = original_dataset.GetProjection()
# original_geotransform = original_dataset.GetGeoTransform()
# # Abrir la imagen resultado
# result_dataset = gdal.Open(salida, gdal.GA_Update)
# # Copiar la informaci贸n de georreferenciaci贸n del archivo original a la imagen resultado
# result_dataset.SetProjection(original_projection)
# result_dataset.SetGeoTransform(original_geotransform)
# # Cerrar los archivos
# original_dataset = None
# result_dataset = None
# except Exception as e:
# print("Error: ", e)
###FIN de extras
#repo_id = "Ignaciobfp/segmentacion-dron-marras"
#learner = from_pretrained_fastai(repo_id)
device = torch.device("cpu")
#model = learner.model
model = torch.jit.load("modelo_marras.pth")
model = model.cpu()
def transform_image(image):
my_transforms = transforms.Compose([transforms.ToTensor(),
transforms.Normalize(
[0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])])
image_aux = image
return my_transforms(image_aux).unsqueeze(0).to(device)
# Definimos una funci贸n que se encarga de llevar a cabo las predicciones
def predict(img):
img_pil = PIL.Image.fromarray(img, 'RGB')
image = transforms.Resize((400,400))(img_pil)
tensor = transform_image(image=image)
model.to(device)
with torch.no_grad():
outputs = model(tensor)
outputs = torch.argmax(outputs,1)
mask = np.array(outputs.cpu())
mask[mask==1]=255
mask=np.reshape(mask,(400,400))
return Image.fromarray(mask.astype('uint8'))
def predict_full(img):
# Obtener la ruta actual
ruta_actual = Path(".")
# Imprimir la ruta actual en la consola
print(f"La ruta actual es: {ruta_actual.resolve()}")
single_image_path = "/home/user/app/tmp.tif"
base_f = "."
pro_f = "processing"
rsw_f = "results_windows"
rsd_f = "results_together"
destpath = process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f)
im = Image.open(destpath)
return im
# Creamos la interfaz y la lanzamos.
gr.Interface(fn=predict_full, inputs=gr.inputs.Image(), outputs=gr.outputs.Image(type="pil")).launch(share=False)