|
from huggingface_hub import from_pretrained_fastai |
|
import gradio as gr |
|
from fastai.vision.all import * |
|
import PIL |
|
import torchvision.transforms as transforms |
|
|
|
|
|
import numpy as np |
|
import os |
|
import cv2 |
|
|
|
def extract_subimages(image : np.ndarray, wwidth, wheight, overlap_fraction): |
|
""" |
|
Extracts subimages of the input image using a moving window of size (wwidth, wheight) |
|
with the specified overlap fraction. Returns a tuple (subimages, coords) where subimages |
|
is a list of subimages and coords is a list of tuples (x, y) indicating the top left corner |
|
coordinates of each subimage in the input image. |
|
""" |
|
subimages = [] |
|
coords = [] |
|
height, width, channels = image.shape |
|
if channels > 3: |
|
image = image[:,:,0:3] |
|
channels = 3 |
|
overlap = int(max(0, min(overlap_fraction, 1)) * min(wwidth, wheight)) |
|
y = 0 |
|
while y + wheight <= height: |
|
x = 0 |
|
while x + wwidth <= width: |
|
subimage = image[y:y+wheight, x:x+wwidth, :] |
|
subimages.append(subimage) |
|
coords.append((x, y)) |
|
x += wwidth - overlap |
|
y += wheight - overlap |
|
if y < height: |
|
y = height - wheight |
|
x = 0 |
|
while x + wwidth <= width: |
|
subimage = image[y:y+wheight, x:x+wwidth, :] |
|
subimages.append(subimage) |
|
coords.append((x, y)) |
|
x += wwidth - overlap |
|
if x < width: |
|
x = width - wwidth |
|
subimage = image[y:y+wheight, x:x+wwidth, :] |
|
subimages.append(subimage) |
|
coords.append((x, y)) |
|
if x < width: |
|
x = width - wwidth |
|
y = 0 |
|
while y + wheight <= height: |
|
subimage = image[y:y+wheight, x:x+wwidth, :] |
|
subimages.append(subimage) |
|
coords.append((x, y)) |
|
y += wheight - overlap |
|
if y < height: |
|
y = height - wheight |
|
subimage = image[y:y+wheight, x:x+wwidth, :] |
|
subimages.append(subimage) |
|
coords.append((x, y)) |
|
return subimages, coords |
|
|
|
|
|
def generate_and_save_subimages(path, output_dir_images, output_dir_labels = None): |
|
if output_dir_labels: |
|
if not os.path.exists(output_dir_labels): |
|
os.makedirs(output_dir_labels) |
|
|
|
if not os.path.exists(output_dir_images): |
|
os.makedirs(output_dir_images) |
|
|
|
for filename in os.listdir(path): |
|
if filename.endswith(".png") or filename.endswith(".tif"): |
|
filepath = os.path.join(path, filename) |
|
image = cv2.imread(filepath) |
|
subimages, coords = extract_subimages(image, 400, 400, 0.66) |
|
for i, subimage in enumerate(subimages): |
|
if filename.endswith(".png"): |
|
output_filename = os.path.join(output_dir_images, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") |
|
cv2.imwrite(output_filename, subimage) |
|
else: |
|
if output_dir_labels: |
|
output_filename = os.path.join(output_dir_labels, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.tif") |
|
cv2.imwrite(output_filename, subimage) |
|
|
|
def generate_and_save_subimages_nolabel(path, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True): |
|
for entry in os.scandir(path): |
|
if entry.is_file() and entry.name.lower().endswith(imagesformat): |
|
filepath = entry.path |
|
gss_single(filepath, output_dir_images, olverlap, imagesformat, split_in_dirs) |
|
|
|
def gss_single(filepath, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True): |
|
image = cv2.imread(filepath) |
|
|
|
if split_in_dirs: |
|
dir_this_image = Path(output_dir_images)/filepath.rsplit('.', 1)[0] |
|
os.makedirs(dir_this_image, exist_ok=True) |
|
else: |
|
os.makedirs(output_dir_images, exist_ok=True) |
|
|
|
subimages, coords = extract_subimages(image, 400, 400, olverlap) |
|
for i, subimage in enumerate(subimages): |
|
if split_in_dirs: |
|
output_filename = os.path.join(dir_this_image, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") |
|
else: |
|
output_filename = os.path.join(output_dir_images, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") |
|
cv2.imwrite(output_filename, subimage) |
|
|
|
def split_windows_in_folders(input_images_folder, output_images_folder): |
|
for filename in os.listdir(input_images_folder): |
|
dir_this_image = Path(output_images_folder)/filename.rsplit('.', 1)[0] |
|
os.makedirs(dir_this_image, exist_ok=True) |
|
if filename.endswith(".png"): |
|
print(str(dir_this_image)) |
|
filepath = os.path.join(path, filename) |
|
image = cv2.imread(filepath) |
|
subimages, coords = extract_subimages(image, 400, 400, 0) |
|
for i, subimage in enumerate(subimages): |
|
output_filename = os.path.join(dir_this_image, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") |
|
cv2.imwrite(output_filename, subimage) |
|
|
|
|
|
def subimages_from_directory(directorio): |
|
|
|
directorio = directorio |
|
|
|
|
|
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)") |
|
|
|
windowlist = [] |
|
coords = [] |
|
|
|
|
|
for filename in os.listdir(directorio): |
|
match = patron.search(filename) |
|
if match: |
|
origname = match.group(1) |
|
x = int(match.group(2)) |
|
y = int(match.group(3)) |
|
|
|
img = cv2.imread(os.path.join(directorio, filename)) |
|
windowlist.append(img) |
|
coords.append((x, y)) |
|
|
|
|
|
windowlist, coords = zip(*sorted(zip(windowlist, coords), key=lambda pair: (pair[1][0], pair[1][1]))) |
|
wh, ww, chan = windowlist[0].shape |
|
origsize = tuple(elem1 + elem2 for elem1, elem2 in zip(coords[-1], (wh,ww))) |
|
|
|
return windowlist, coords, wh, ww, chan, origsize |
|
|
|
def subimages_onlypath(directorio): |
|
|
|
directorio = directorio |
|
pathlist = [] |
|
|
|
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)") |
|
|
|
for filename in os.listdir(directorio): |
|
match = patron.search(filename) |
|
if match: |
|
pathlist.append(os.path.join(directorio, filename)) |
|
|
|
return pathlist |
|
|
|
def ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize): |
|
canvas = np.zeros((origsize[1], origsize[0], chan), dtype=np.uint8) |
|
for idx, window in enumerate(windowlist): |
|
canvas[coords[idx][1]:coords[idx][1]+wh, coords[idx][0]:coords[idx][0]+ww, :] = window |
|
return canvas |
|
|
|
def get_list_tp(path): |
|
list_to_process = [] |
|
list_names = [] |
|
|
|
for element in os.scandir(path): |
|
|
|
if element.is_dir(): |
|
|
|
windowlist, coords, wh, ww, chan, origsize = subimages_from_directory(element) |
|
list_to_process.append(ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize)) |
|
list_names.append(element.name) |
|
return list_to_process, list_names |
|
|
|
def get_paths_tp(path): |
|
list_to_process = [] |
|
|
|
for element in os.scandir(path): |
|
|
|
if element.is_dir(): |
|
|
|
list_to_process.append(subimages_onlypath(element)) |
|
return list_to_process |
|
|
|
def process_multifolder(process_folders, result_folder): |
|
for folder in process_folders: |
|
folname = os.path.basename(os.path.dirname(folder[0])) |
|
destname = Path(result_folder)/folname |
|
os.makedirs(destname, exist_ok=True) |
|
for subimagepath in folder: |
|
img = PIL.Image.open(subimagepath) |
|
image = transforms.Resize((400,400))(img) |
|
tensor = transform_image(image=image) |
|
with torch.no_grad(): |
|
outputs = model(tensor) |
|
outputs = torch.argmax(outputs,1) |
|
mask = np.array(outputs.cpu()) |
|
mask[mask==1]=255 |
|
mask=np.reshape(mask,(400,400)) |
|
mask_img = Image.fromarray(mask.astype('uint8')) |
|
|
|
filename = os.path.basename(subimagepath) |
|
new_image_path = os.path.join(result_folder, folname, filename) |
|
mask_img.save(new_image_path) |
|
|
|
def recombine_windows(results_folder_w, result_f_rec): |
|
imgs, nombres = get_list_tp(results_folder_w) |
|
os.makedirs(result_f_rec, exist_ok=True) |
|
|
|
for idx, image in enumerate(imgs): |
|
img = Image.fromarray(image) |
|
new_image_path = os.path.join(result_f_rec, nombres[idx] + '.tif') |
|
img.save(new_image_path, compression='tiff_lzw') |
|
return new_image_path |
|
|
|
def process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f): |
|
gss_single(single_image_path, pro_f, 0, "tif", True) |
|
process_multifolder(get_paths_tp(pro_f),rsw_f) |
|
pt = recombine_windows(rsw_f,rsd_f) |
|
shutil.rmtree(pro_f) |
|
shutil.rmtree(rsw_f) |
|
|
|
return pt |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = torch.device("cpu") |
|
|
|
model = torch.jit.load("modelo_marras.pth") |
|
model = model.cpu() |
|
|
|
def transform_image(image): |
|
my_transforms = transforms.Compose([transforms.ToTensor(), |
|
transforms.Normalize( |
|
[0.485, 0.456, 0.406], |
|
[0.229, 0.224, 0.225])]) |
|
image_aux = image |
|
return my_transforms(image_aux).unsqueeze(0).to(device) |
|
|
|
|
|
|
|
def predict(img): |
|
img_pil = PIL.Image.fromarray(img, 'RGB') |
|
image = transforms.Resize((400,400))(img_pil) |
|
tensor = transform_image(image=image) |
|
model.to(device) |
|
with torch.no_grad(): |
|
outputs = model(tensor) |
|
outputs = torch.argmax(outputs,1) |
|
mask = np.array(outputs.cpu()) |
|
mask[mask==1]=255 |
|
mask=np.reshape(mask,(400,400)) |
|
return Image.fromarray(mask.astype('uint8')) |
|
|
|
def predict_full(img): |
|
|
|
ruta_actual = Path(".") |
|
|
|
|
|
print(f"La ruta actual es: {ruta_actual.resolve()}") |
|
|
|
single_image_path = "/home/user/app/tmp.tif" |
|
base_f = "." |
|
pro_f = "processing" |
|
rsw_f = "results_windows" |
|
rsd_f = "results_together" |
|
destpath = process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f) |
|
im = Image.open(destpath) |
|
return im |
|
|
|
|
|
gr.Interface(fn=predict_full, inputs=gr.inputs.Image(), outputs=gr.outputs.Image(type="pil")).launch(share=False) |
|
|
|
|
|
|