#### FACE_ENHANCER.PY CODE START ### import os import cv2 import torch import gfpgan from PIL import Image from upscaler.RealESRGAN import RealESRGAN from upscaler.codeformer import CodeFormerEnhancer def gfpgan_runner(img, model): _, imgs, _ = model.enhance(img, paste_back=True, has_aligned=True) return imgs[0] def realesrgan_runner(img, model): img = model.predict(img) return img def codeformer_runner(img, model): img = model.enhance(img) return img supported_enhancers = { "CodeFormer": ("./assets/pretrained_models/codeformer.onnx", codeformer_runner), "GFPGAN": ("./assets/pretrained_models/GFPGANv1.4.pth", gfpgan_runner), "REAL-ESRGAN 2x": ("./assets/pretrained_models/RealESRGAN_x2.pth", realesrgan_runner), "REAL-ESRGAN 4x": ("./assets/pretrained_models/RealESRGAN_x4.pth", realesrgan_runner), "REAL-ESRGAN 8x": ("./assets/pretrained_models/RealESRGAN_x8.pth", realesrgan_runner) } cv2_interpolations = ["LANCZOS4", "CUBIC", "NEAREST"] def get_available_enhancer_names(): available = [] for name, data in supported_enhancers.items(): path = os.path.join(os.path.abspath(os.path.dirname(__file__)), data[0]) if os.path.exists(path): available.append(name) return available def load_face_enhancer_model(name='GFPGAN', device="cpu"): assert name in get_available_enhancer_names() + cv2_interpolations, f"Face enhancer {name} unavailable." if name in supported_enhancers.keys(): model_path, model_runner = supported_enhancers.get(name) model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path) if name == 'CodeFormer': model = CodeFormerEnhancer(model_path=model_path, device=device) elif name == 'GFPGAN': model = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=device) elif name == 'REAL-ESRGAN 2x': model = RealESRGAN(device, scale=2) model.load_weights(model_path, download=False) elif name == 'REAL-ESRGAN 4x': model = RealESRGAN(device, scale=4) model.load_weights(model_path, download=False) elif name == 'REAL-ESRGAN 8x': model = RealESRGAN(device, scale=8) model.load_weights(model_path, download=False) elif name == 'LANCZOS4': model = None model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_LANCZOS4) elif name == 'CUBIC': model = None model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_CUBIC) elif name == 'NEAREST': model = None model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_NEAREST) else: model = None return (model, model_runner) #### FACE_EHNANCER.PY CODE END ### #### FACE_SWAPPER.PY CODE START ### import time import torch import onnx import cv2 import onnxruntime import numpy as np from tqdm import tqdm import torch.nn as nn from onnx import numpy_helper from skimage import transform as trans import torchvision.transforms.functional as F import torch.nn.functional as F from utils import mask_crop, laplacian_blending arcface_dst = np.array( [[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366], [41.5493, 92.3655], [70.7299, 92.2041]], dtype=np.float32) def estimate_norm(lmk, image_size=112, mode='arcface'): assert lmk.shape == (5, 2) assert image_size % 112 == 0 or image_size % 128 == 0 if image_size % 112 == 0: ratio = float(image_size) / 112.0 diff_x = 0 else: ratio = float(image_size) / 128.0 diff_x = 8.0 * ratio dst = arcface_dst * ratio dst[:, 0] += diff_x tform = trans.SimilarityTransform() tform.estimate(lmk, dst) M = tform.params[0:2, :] return M def norm_crop2(img, landmark, image_size=112, mode='arcface'): M = estimate_norm(landmark, image_size, mode) warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0) return warped, M class Inswapper(): def __init__(self, model_file=None, batch_size=32, providers=['CPUExecutionProvider']): self.model_file = model_file self.batch_size = batch_size model = onnx.load(self.model_file) graph = model.graph self.emap = numpy_helper.to_array(graph.initializer[-1]) self.session_options = onnxruntime.SessionOptions() self.session = onnxruntime.InferenceSession(self.model_file, sess_options=self.session_options, providers=providers) def forward(self, imgs, latents): preds = [] for img, latent in zip(imgs, latents): img = img / 255 pred = self.session.run(['output'], {'target': img, 'source': latent})[0] preds.append(pred) def get(self, imgs, target_faces, source_faces): imgs = list(imgs) preds = [None] * len(imgs) matrs = [None] * len(imgs) for idx, (img, target_face, source_face) in enumerate(zip(imgs, target_faces, source_faces)): matrix, blob, latent = self.prepare_data(img, target_face, source_face) pred = self.session.run(['output'], {'target': blob, 'source': latent})[0] pred = pred.transpose((0, 2, 3, 1))[0] pred = np.clip(255 * pred, 0, 255).astype(np.uint8)[:, :, ::-1] preds[idx] = pred matrs[idx] = matrix return (preds, matrs) def prepare_data(self, img, target_face, source_face): if isinstance(img, str): img = cv2.imread(img) aligned_img, matrix = norm_crop2(img, target_face.kps, 128) blob = cv2.dnn.blobFromImage(aligned_img, 1.0 / 255, (128, 128), (0., 0., 0.), swapRB=True) latent = source_face.normed_embedding.reshape((1, -1)) latent = np.dot(latent, self.emap) latent /= np.linalg.norm(latent) return (matrix, blob, latent) def batch_forward(self, img_list, target_f_list, source_f_list): num_samples = len(img_list) num_batches = (num_samples + self.batch_size - 1) // self.batch_size for i in tqdm(range(num_batches), desc="Generating face"): start_idx = i * self.batch_size end_idx = min((i + 1) * self.batch_size, num_samples) batch_img = img_list[start_idx:end_idx] batch_target_f = target_f_list[start_idx:end_idx] batch_source_f = source_f_list[start_idx:end_idx] batch_pred, batch_matr = self.get(batch_img, batch_target_f, batch_source_f) yield batch_pred, batch_matr def paste_to_whole(foreground, background, matrix, mask=None, crop_mask=(0,0,0,0), blur_amount=0.1, erode_amount = 0.15, blend_method='linear'): inv_matrix = cv2.invertAffineTransform(matrix) fg_shape = foreground.shape[:2] bg_shape = (background.shape[1], background.shape[0]) foreground = cv2.warpAffine(foreground, inv_matrix, bg_shape, borderValue=0.0) if mask is None: mask = np.full(fg_shape, 1., dtype=np.float32) mask = mask_crop(mask, crop_mask) mask = cv2.warpAffine(mask, inv_matrix, bg_shape, borderValue=0.0) else: assert fg_shape == mask.shape[:2], "foreground & mask shape mismatch!" mask = mask_crop(mask, crop_mask).astype('float32') mask = cv2.warpAffine(mask, inv_matrix, (background.shape[1], background.shape[0]), borderValue=0.0) _mask = mask.copy() _mask[_mask > 0.05] = 1. non_zero_points = cv2.findNonZero(_mask) _, _, w, h = cv2.boundingRect(non_zero_points) mask_size = int(np.sqrt(w * h)) if erode_amount > 0: kernel_size = max(int(mask_size * erode_amount), 1) structuring_element = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size)) mask = cv2.erode(mask, structuring_element) if blur_amount > 0: kernel_size = max(int(mask_size * blur_amount), 3) if kernel_size % 2 == 0: kernel_size += 1 mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0) mask = np.tile(np.expand_dims(mask, axis=-1), (1, 1, 3)) if blend_method == 'laplacian': composite_image = laplacian_blending(foreground, background, mask.clip(0,1), num_levels=4) else: composite_image = mask * foreground + (1 - mask) * background return composite_image.astype("uint8").clip(0, 255) #### FACE_SWAPPER.PY CODE END ### #### FACE_ANALYSER.PY CODE START ### import os import cv2 import numpy as np from tqdm import tqdm from utils import scale_bbox_from_center detect_conditions = [ "best detection", "left most", "right most", "top most", "bottom most", "middle", "biggest", "smallest", ] swap_options_list = [ "All Face", "Specific Face", "Age less than", "Age greater than", "All Male", "All Female", "Left Most", "Right Most", "Top Most", "Bottom Most", "Middle", "Biggest", "Smallest", ] def get_single_face(faces, method="best detection"): total_faces = len(faces) if total_faces == 1: return faces[0] print(f"{total_faces} face detected. Using {method} face.") if method == "best detection": return sorted(faces, key=lambda face: face["det_score"])[-1] elif method == "left most": return sorted(faces, key=lambda face: face["bbox"][0])[0] elif method == "right most": return sorted(faces, key=lambda face: face["bbox"][0])[-1] elif method == "top most": return sorted(faces, key=lambda face: face["bbox"][1])[0] elif method == "bottom most": return sorted(faces, key=lambda face: face["bbox"][1])[-1] elif method == "middle": return sorted(faces, key=lambda face: ( (face["bbox"][0] + face["bbox"][2]) / 2 - 0.5) ** 2 + ((face["bbox"][1] + face["bbox"][3]) / 2 - 0.5) ** 2)[len(faces) // 2] elif method == "biggest": return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[-1] elif method == "smallest": return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[0] def analyse_face(image, model, return_single_face=True, detect_condition="best detection", scale=1.0): faces = model.get(image) if scale != 1: # landmark-scale for i, face in enumerate(faces): landmark = face['kps'] center = np.mean(landmark, axis=0) landmark = center + (landmark - center) * scale faces[i]['kps'] = landmark if not return_single_face: return faces return get_single_face(faces, method=detect_condition) def cosine_distance(a, b): a /= np.linalg.norm(a) b /= np.linalg.norm(b) return 1 - np.dot(a, b) def get_analysed_data(face_analyser, image_sequence, source_data, swap_condition="All face", detect_condition="left most", scale=1.0): if swap_condition != "Specific Face": source_path, age = source_data source_image = cv2.imread(source_path) analysed_source = analyse_face(source_image, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) else: analysed_source_specifics = [] source_specifics, threshold = source_data for source, specific in zip(*source_specifics): if source is None or specific is None: continue analysed_source = analyse_face(source, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) analysed_specific = analyse_face(specific, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) analysed_source_specifics.append([analysed_source, analysed_specific]) analysed_target_list = [] analysed_source_list = [] whole_frame_eql_list = [] num_faces_per_frame = [] total_frames = len(image_sequence) curr_idx = 0 for curr_idx, frame_path in tqdm(enumerate(image_sequence), total=total_frames, desc="Analysing face data"): frame = cv2.imread(frame_path) analysed_faces = analyse_face(frame, face_analyser, return_single_face=False, detect_condition=detect_condition, scale=scale) n_faces = 0 for analysed_face in analysed_faces: if swap_condition == "All Face": analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Age less than" and analysed_face["age"] < age: analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Age greater than" and analysed_face["age"] > age: analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "All Male" and analysed_face["gender"] == 1: analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "All Female" and analysed_face["gender"] == 0: analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Specific Face": for analysed_source, analysed_specific in analysed_source_specifics: distance = cosine_distance(analysed_specific["embedding"], analysed_face["embedding"]) if distance < threshold: analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 if swap_condition == "Left Most": analysed_face = get_single_face(analysed_faces, method="left most") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Right Most": analysed_face = get_single_face(analysed_faces, method="right most") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Top Most": analysed_face = get_single_face(analysed_faces, method="top most") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Bottom Most": analysed_face = get_single_face(analysed_faces, method="bottom most") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Middle": analysed_face = get_single_face(analysed_faces, method="middle") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Biggest": analysed_face = get_single_face(analysed_faces, method="biggest") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 elif swap_condition == "Smallest": analysed_face = get_single_face(analysed_faces, method="smallest") analysed_target_list.append(analysed_face) analysed_source_list.append(analysed_source) whole_frame_eql_list.append(frame_path) n_faces += 1 num_faces_per_frame.append(n_faces) return analysed_target_list, analysed_source_list, whole_frame_eql_list, num_faces_per_frame #### FACE_ANALYSER.PY CODE END ### #### UTILS.PY CODE START ### import os import cv2 import time import glob import shutil import platform import datetime import subprocess import numpy as np from threading import Thread from moviepy.editor import VideoFileClip, ImageSequenceClip from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED) quality_types = ["poor", "low", "medium", "high", "best"] bitrate_quality_by_resolution = { 240: {"poor": "300k", "low": "500k", "medium": "800k", "high": "1000k", "best": "1200k"}, 360: {"poor": "500k","low": "800k","medium": "1200k","high": "1500k","best": "2000k"}, 480: {"poor": "800k","low": "1200k","medium": "2000k","high": "2500k","best": "3000k"}, 720: {"poor": "1500k","low": "2500k","medium": "4000k","high": "5000k","best": "6000k"}, 1080: {"poor": "2500k","low": "4000k","medium": "6000k","high": "7000k","best": "8000k"}, 1440: {"poor": "4000k","low": "6000k","medium": "8000k","high": "10000k","best": "12000k"}, 2160: {"poor": "8000k","low": "10000k","medium": "12000k","high": "15000k","best": "20000k"} } crf_quality_by_resolution = { 240: {"poor": 45, "low": 35, "medium": 28, "high": 23, "best": 20}, 360: {"poor": 35, "low": 28, "medium": 23, "high": 20, "best": 18}, 480: {"poor": 28, "low": 23, "medium": 20, "high": 18, "best": 16}, 720: {"poor": 23, "low": 20, "medium": 18, "high": 16, "best": 14}, 1080: {"poor": 20, "low": 18, "medium": 16, "high": 14, "best": 12}, 1440: {"poor": 18, "low": 16, "medium": 14, "high": 12, "best": 10}, 2160: {"poor": 16, "low": 14, "medium": 12, "high": 10, "best": 8} } def get_bitrate_for_resolution(resolution, quality): available_resolutions = list(bitrate_quality_by_resolution.keys()) closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) return bitrate_quality_by_resolution[closest_resolution][quality] def get_crf_for_resolution(resolution, quality): available_resolutions = list(crf_quality_by_resolution.keys()) closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) return crf_quality_by_resolution[closest_resolution][quality] def get_video_bitrate(video_file): ffprobe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=bit_rate', '-of', 'default=noprint_wrappers=1:nokey=1', video_file] result = subprocess.run(ffprobe_cmd, stdout=subprocess.PIPE) kbps = max(int(result.stdout) // 1000, 10) return str(kbps) + 'k' def trim_video(video_path, output_path, start_frame, stop_frame): video_name, _ = os.path.splitext(os.path.basename(video_path)) trimmed_video_filename = video_name + "_trimmed" + ".mp4" temp_path = os.path.join(output_path, "trim") os.makedirs(temp_path, exist_ok=True) trimmed_video_file_path = os.path.join(temp_path, trimmed_video_filename) video = VideoFileClip(video_path, fps_source="fps") fps = video.fps start_time = start_frame / fps duration = (stop_frame - start_frame) / fps bitrate = get_bitrate_for_resolution(min(*video.size), "high") trimmed_video = video.subclip(start_time, start_time + duration) trimmed_video.write_videofile( trimmed_video_file_path, codec="libx264", audio_codec="aac", bitrate=bitrate, ) trimmed_video.close() video.close() return trimmed_video_file_path def open_directory(path=None): if path is None: return try: os.startfile(path) except: subprocess.Popen(["xdg-open", path]) class StreamerThread(object): def __init__(self, src=0): self.capture = cv2.VideoCapture(src) self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2) self.FPS = 1 / 30 self.FPS_MS = int(self.FPS * 1000) self.thread = None self.stopped = False self.frame = None def start(self): self.thread = Thread(target=self.update, args=()) self.thread.daemon = True self.thread.start() def stop(self): self.stopped = True self.thread.join() print("stopped") def update(self): while not self.stopped: if self.capture.isOpened(): (self.status, self.frame) = self.capture.read() time.sleep(self.FPS) class ProcessBar: def __init__(self, bar_length, total, before="⬛", after="🟨"): self.bar_length = bar_length self.total = total self.before = before self.after = after self.bar = [self.before] * bar_length self.start_time = time.time() def get(self, index): total = self.total elapsed_time = time.time() - self.start_time average_time_per_iteration = elapsed_time / (index + 1) remaining_iterations = total - (index + 1) estimated_remaining_time = remaining_iterations * average_time_per_iteration self.bar[int(index / total * self.bar_length)] = self.after info_text = f"({index+1}/{total}) {''.join(self.bar)} " info_text += f"(ETR: {int(estimated_remaining_time // 60)} min {int(estimated_remaining_time % 60)} sec)" return info_text def add_logo_to_image(img, logo=logo_image): logo_size = int(img.shape[1] * 0.1) logo = cv2.resize(logo, (logo_size, logo_size)) if logo.shape[2] == 4: alpha = logo[:, :, 3] else: alpha = np.ones_like(logo[:, :, 0]) * 255 padding = int(logo_size * 0.1) roi = img.shape[0] - logo_size - padding, img.shape[1] - logo_size - padding for c in range(0, 3): img[roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c] = ( alpha / 255.0 ) * logo[:, :, c] + (1 - alpha / 255.0) * img[ roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c ] return img def split_list_by_lengths(data, length_list): split_data = [] start_idx = 0 for length in length_list: end_idx = start_idx + length sublist = data[start_idx:end_idx] split_data.append(sublist) start_idx = end_idx return split_data def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name): video_clip = VideoFileClip(ref_video_path, fps_source="fps") fps = video_clip.fps duration = video_clip.duration total_frames = video_clip.reader.nframes audio_clip = video_clip.audio if video_clip.audio is not None else None edited_video_clip = ImageSequenceClip(image_sequence, fps=fps) if audio_clip is not None: edited_video_clip = edited_video_clip.set_audio(audio_clip) bitrate = get_bitrate_for_resolution(min(*edited_video_clip.size), "high") edited_video_clip.set_duration(duration).write_videofile( output_file_name, codec="libx264", bitrate=bitrate, ) edited_video_clip.close() video_clip.close() def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_height): # Extract the coordinates of the bbox x1, y1, x2, y2 = bbox # Calculate the center point of the bbox center_x = (x1 + x2) / 2 center_y = (y1 + y2) / 2 # Calculate the new width and height of the bbox based on the scaling factors width = x2 - x1 height = y2 - y1 new_width = width * scale_width new_height = height * scale_height # Calculate the new coordinates of the bbox, considering the image boundaries new_x1 = center_x - new_width / 2 new_y1 = center_y - new_height / 2 new_x2 = center_x + new_width / 2 new_y2 = center_y + new_height / 2 # Adjust the coordinates to ensure the bbox remains within the image boundaries new_x1 = max(0, new_x1) new_y1 = max(0, new_y1) new_x2 = min(image_width - 1, new_x2) new_y2 = min(image_height - 1, new_y2) # Return the scaled bbox coordinates scaled_bbox = [new_x1, new_y1, new_x2, new_y2] return scaled_bbox def laplacian_blending(A, B, m, num_levels=7): assert A.shape == B.shape assert B.shape == m.shape height = m.shape[0] width = m.shape[1] size_list = np.array([4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192]) size = size_list[np.where(size_list > max(height, width))][0] GA = np.zeros((size, size, 3), dtype=np.float32) GA[:height, :width, :] = A GB = np.zeros((size, size, 3), dtype=np.float32) GB[:height, :width, :] = B GM = np.zeros((size, size, 3), dtype=np.float32) GM[:height, :width, :] = m gpA = [GA] gpB = [GB] gpM = [GM] for i in range(num_levels): GA = cv2.pyrDown(GA) GB = cv2.pyrDown(GB) GM = cv2.pyrDown(GM) gpA.append(np.float32(GA)) gpB.append(np.float32(GB)) gpM.append(np.float32(GM)) lpA = [gpA[num_levels-1]] lpB = [gpB[num_levels-1]] gpMr = [gpM[num_levels-1]] for i in range(num_levels-1,0,-1): LA = np.subtract(gpA[i-1], cv2.pyrUp(gpA[i])) LB = np.subtract(gpB[i-1], cv2.pyrUp(gpB[i])) lpA.append(LA) lpB.append(LB) gpMr.append(gpM[i-1]) LS = [] for la,lb,gm in zip(lpA,lpB,gpMr): ls = la * gm + lb * (1.0 - gm) LS.append(ls) ls_ = LS[0] for i in range(1,num_levels): ls_ = cv2.pyrUp(ls_) ls_ = cv2.add(ls_, LS[i]) ls_ = ls_[:height, :width, :] #ls_ = (ls_ - np.min(ls_)) * (255.0 / (np.max(ls_) - np.min(ls_))) return ls_.clip(0, 255) def mask_crop(mask, crop): top, bottom, left, right = crop shape = mask.shape top = int(top) bottom = int(bottom) if top + bottom < shape[1]: if top > 0: mask[:top, :] = 0 if bottom > 0: mask[-bottom:, :] = 0 left = int(left) right = int(right) if left + right < shape[0]: if left > 0: mask[:, :left] = 0 if right > 0: mask[:, -right:] = 0 return mask def create_image_grid(images, size=128): num_images = len(images) num_cols = int(np.ceil(np.sqrt(num_images))) num_rows = int(np.ceil(num_images / num_cols)) grid = np.zeros((num_rows * size, num_cols * size, 3), dtype=np.uint8) for i, image in enumerate(images): row_idx = (i // num_cols) * size col_idx = (i % num_cols) * size image = cv2.resize(image.copy(), (size,size)) if image.dtype != np.uint8: image = (image.astype('float32') * 255).astype('uint8') if image.ndim == 2: image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) grid[row_idx:row_idx + size, col_idx:col_idx + size] = image return grid #### UTILS.PY CODE END ### #### APP.PY CODE END ### import os import spaces import cv2 import glob import time import torch import shutil import argparse import platform import datetime import subprocess import insightface import onnxruntime import numpy as np import gradio as gr import threading import queue from tqdm import tqdm import concurrent.futures from moviepy.editor import VideoFileClip from nsfw_checker import NSFWChecker from face_swapper import Inswapper, paste_to_whole from face_analyser import detect_conditions, get_analysed_data, swap_options_list from face_parsing import init_parsing_model, get_parsed_mask, mask_regions, mask_regions_to_list from face_enhancer import get_available_enhancer_names, load_face_enhancer_model, cv2_interpolations from utils import trim_video, StreamerThread, ProcessBar, open_directory, split_list_by_lengths, merge_img_sequence_from_ref, create_image_grid ## ------------------------------ USER ARGS ------------------------------ parser = argparse.ArgumentParser(description="Swap-Mukham Face Swapper") parser.add_argument("--out_dir", help="Default Output directory", default=os.getcwd()) parser.add_argument("--batch_size", help="Gpu batch size", default=32) parser.add_argument("--cuda", action="store_true", help="Enable cuda", default=False) parser.add_argument( "--colab", action="store_true", help="Enable colab mode", default=False ) user_args = parser.parse_args() ## ------------------------------ DEFAULTS ------------------------------ USE_COLAB = user_args.colab USE_CUDA = user_args.cuda DEF_OUTPUT_PATH = user_args.out_dir BATCH_SIZE = int(user_args.batch_size) WORKSPACE = None OUTPUT_FILE = None CURRENT_FRAME = None STREAMER = None DETECT_CONDITION = "best detection" DETECT_SIZE = 640 DETECT_THRESH = 0.6 NUM_OF_SRC_SPECIFIC = 10 MASK_INCLUDE = [ "Skin", "R-Eyebrow", "L-Eyebrow", "L-Eye", "R-Eye", "Nose", "Mouth", "L-Lip", "U-Lip" ] MASK_SOFT_KERNEL = 17 MASK_SOFT_ITERATIONS = 10 MASK_BLUR_AMOUNT = 0.1 MASK_ERODE_AMOUNT = 0.15 FACE_SWAPPER = None FACE_ANALYSER = None FACE_ENHANCER = None FACE_PARSER = None NSFW_DETECTOR = None FACE_ENHANCER_LIST = ["NONE"] FACE_ENHANCER_LIST.extend(get_available_enhancer_names()) FACE_ENHANCER_LIST.extend(cv2_interpolations) ## ------------------------------ SET EXECUTION PROVIDER ------------------------------ # Note: Non CUDA users may change settings here PROVIDER = ["CPUExecutionProvider"] if USE_CUDA: available_providers = onnxruntime.get_available_providers() if "CUDAExecutionProvider" in available_providers: print("\n********** Running on CUDA **********\n") PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"] else: USE_CUDA = False print("\n********** CUDA unavailable running on CPU **********\n") else: USE_CUDA = False print("\n********** Running on CPU **********\n") device = "cuda" if USE_CUDA else "cpu" EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None ## ------------------------------ LOAD MODELS ------------------------------ def load_face_analyser_model(name="buffalo_l"): global FACE_ANALYSER if FACE_ANALYSER is None: FACE_ANALYSER = insightface.app.FaceAnalysis(name=name, providers=PROVIDER) FACE_ANALYSER.prepare( ctx_id=0, det_size=(DETECT_SIZE, DETECT_SIZE), det_thresh=DETECT_THRESH ) def load_face_swapper_model(path="./assets/pretrained_models/inswapper_128.onnx"): global FACE_SWAPPER if FACE_SWAPPER is None: batch = int(BATCH_SIZE) if device == "cuda" else 1 FACE_SWAPPER = Inswapper(model_file=path, batch_size=batch, providers=PROVIDER) def load_face_parser_model(path="./assets/pretrained_models/79999_iter.pth"): global FACE_PARSER if FACE_PARSER is None: FACE_PARSER = init_parsing_model(path, device=device) def load_nsfw_detector_model(path="./assets/pretrained_models/open-nsfw.onnx"): global NSFW_DETECTOR if NSFW_DETECTOR is None: NSFW_DETECTOR = NSFWChecker(model_path=path, providers=PROVIDER) load_face_analyser_model() load_face_swapper_model() ## ------------------------------ MAIN PROCESS ------------------------------ @spaces.GPU(duration=300, enable_queue=True) def process( input_type, image_path, video_path, directory_path, source_path, output_path, output_name, keep_output_sequence, condition, age, distance, face_enhancer_name, enable_face_parser, mask_includes, mask_soft_kernel, mask_soft_iterations, blur_amount, erode_amount, face_scale, enable_laplacian_blend, crop_top, crop_bott, crop_left, crop_right, *specifics, ): global WORKSPACE global OUTPUT_FILE global PREVIEW WORKSPACE, OUTPUT_FILE, PREVIEW = None, None, None ## ------------------------------ GUI UPDATE FUNC ------------------------------ def ui_before(): return ( gr.update(visible=True, value=PREVIEW), gr.update(interactive=False), gr.update(interactive=False), gr.update(visible=False), ) def ui_after(): return ( gr.update(visible=True, value=PREVIEW), gr.update(interactive=True), gr.update(interactive=True), gr.update(visible=False), ) def ui_after_vid(): return ( gr.update(visible=False), gr.update(interactive=True), gr.update(interactive=True), gr.update(value=OUTPUT_FILE, visible=True), ) start_time = time.time() total_exec_time = lambda start_time: divmod(time.time() - start_time, 60) get_finsh_text = lambda start_time: f"✔️ Completed in {int(total_exec_time(start_time)[0])} min {int(total_exec_time(start_time)[1])} sec." ## ------------------------------ PREPARE INPUTS & LOAD MODELS ------------------------------ yield "### \n ⌛ Loading NSFW detector model...", *ui_before() load_nsfw_detector_model() yield "### \n ⌛ Loading face analyser model...", *ui_before() load_face_analyser_model() yield "### \n ⌛ Loading face swapper model...", *ui_before() load_face_swapper_model() if face_enhancer_name != "NONE": if face_enhancer_name not in cv2_interpolations: yield f"### \n ⌛ Loading {face_enhancer_name} model...", *ui_before() FACE_ENHANCER = load_face_enhancer_model(name=face_enhancer_name, device=device) else: FACE_ENHANCER = None if enable_face_parser: yield "### \n ⌛ Loading face parsing model...", *ui_before() load_face_parser_model() includes = mask_regions_to_list(mask_includes) specifics = list(specifics) half = len(specifics) // 2 sources = specifics[:half] specifics = specifics[half:] if crop_top > crop_bott: crop_top, crop_bott = crop_bott, crop_top if crop_left > crop_right: crop_left, crop_right = crop_right, crop_left crop_mask = (crop_top, 511-crop_bott, crop_left, 511-crop_right) def swap_process(image_sequence): ## ------------------------------ CONTENT CHECK ------------------------------ yield "### \n ⌛ Checking contents...", *ui_before() nsfw = NSFW_DETECTOR.is_nsfw(image_sequence) if nsfw: message = "NSFW Content detected !!!" yield f"### \n 🔞 {message}", *ui_before() assert not nsfw, message return False EMPTY_CACHE() ## ------------------------------ ANALYSE FACE ------------------------------ yield "### \n ⌛ Analysing face data...", *ui_before() if condition != "Specific Face": source_data = source_path, age else: source_data = ((sources, specifics), distance) analysed_targets, analysed_sources, whole_frame_list, num_faces_per_frame = get_analysed_data( FACE_ANALYSER, image_sequence, source_data, swap_condition=condition, detect_condition=DETECT_CONDITION, scale=face_scale ) ## ------------------------------ SWAP FUNC ------------------------------ yield "### \n ⌛ Generating faces...", *ui_before() preds = [] matrs = [] count = 0 global PREVIEW for batch_pred, batch_matr in FACE_SWAPPER.batch_forward(whole_frame_list, analysed_targets, analysed_sources): preds.extend(batch_pred) matrs.extend(batch_matr) EMPTY_CACHE() count += 1 if USE_CUDA: image_grid = create_image_grid(batch_pred, size=128) PREVIEW = image_grid[:, :, ::-1] yield f"### \n ⌛ Generating face Batch {count}", *ui_before() ## ------------------------------ FACE ENHANCEMENT ------------------------------ generated_len = len(preds) if face_enhancer_name != "NONE": yield f"### \n ⌛ Upscaling faces with {face_enhancer_name}...", *ui_before() for idx, pred in tqdm(enumerate(preds), total=generated_len, desc=f"Upscaling with {face_enhancer_name}"): enhancer_model, enhancer_model_runner = FACE_ENHANCER pred = enhancer_model_runner(pred, enhancer_model) preds[idx] = cv2.resize(pred, (512,512)) EMPTY_CACHE() ## ------------------------------ FACE PARSING ------------------------------ if enable_face_parser: yield "### \n ⌛ Face-parsing mask...", *ui_before() masks = [] count = 0 for batch_mask in get_parsed_mask(FACE_PARSER, preds, classes=includes, device=device, batch_size=BATCH_SIZE, softness=int(mask_soft_iterations)): masks.append(batch_mask) EMPTY_CACHE() count += 1 if len(batch_mask) > 1: image_grid = create_image_grid(batch_mask, size=128) PREVIEW = image_grid[:, :, ::-1] yield f"### \n ⌛ Face parsing Batch {count}", *ui_before() masks = np.concatenate(masks, axis=0) if len(masks) >= 1 else masks else: masks = [None] * generated_len ## ------------------------------ SPLIT LIST ------------------------------ split_preds = split_list_by_lengths(preds, num_faces_per_frame) del preds split_matrs = split_list_by_lengths(matrs, num_faces_per_frame) del matrs split_masks = split_list_by_lengths(masks, num_faces_per_frame) del masks ## ------------------------------ PASTE-BACK ------------------------------ yield "### \n ⌛ Pasting back...", *ui_before() def post_process(frame_idx, frame_img, split_preds, split_matrs, split_masks, enable_laplacian_blend, crop_mask, blur_amount, erode_amount): whole_img_path = frame_img whole_img = cv2.imread(whole_img_path) blend_method = 'laplacian' if enable_laplacian_blend else 'linear' for p, m, mask in zip(split_preds[frame_idx], split_matrs[frame_idx], split_masks[frame_idx]): p = cv2.resize(p, (512,512)) mask = cv2.resize(mask, (512,512)) if mask is not None else None m /= 0.25 whole_img = paste_to_whole(p, whole_img, m, mask=mask, crop_mask=crop_mask, blend_method=blend_method, blur_amount=blur_amount, erode_amount=erode_amount) cv2.imwrite(whole_img_path, whole_img) def concurrent_post_process(image_sequence, *args): with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for idx, frame_img in enumerate(image_sequence): future = executor.submit(post_process, idx, frame_img, *args) futures.append(future) for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Pasting back"): result = future.result() concurrent_post_process( image_sequence, split_preds, split_matrs, split_masks, enable_laplacian_blend, crop_mask, blur_amount, erode_amount ) ## ------------------------------ IMAGE ------------------------------ if input_type == "Image": target = cv2.imread(image_path) output_file = os.path.join(output_path, output_name + ".png") cv2.imwrite(output_file, target) for info_update in swap_process([output_file]): yield info_update OUTPUT_FILE = output_file WORKSPACE = output_path PREVIEW = cv2.imread(output_file)[:, :, ::-1] yield get_finsh_text(start_time), *ui_after() ## ------------------------------ VIDEO ------------------------------ elif input_type == "Video": temp_path = os.path.join(output_path, output_name, "sequence") os.makedirs(temp_path, exist_ok=True) yield "### \n ⌛ Extracting video frames...", *ui_before() image_sequence = [] cap = cv2.VideoCapture(video_path) curr_idx = 0 while True: ret, frame = cap.read() if not ret:break frame_path = os.path.join(temp_path, f"frame_{curr_idx}.jpg") cv2.imwrite(frame_path, frame) image_sequence.append(frame_path) curr_idx += 1 cap.release() cv2.destroyAllWindows() for info_update in swap_process(image_sequence): yield info_update yield "### \n ⌛ Merging sequence...", *ui_before() output_video_path = os.path.join(output_path, output_name + ".mp4") merge_img_sequence_from_ref(video_path, image_sequence, output_video_path) if os.path.exists(temp_path) and not keep_output_sequence: yield "### \n ⌛ Removing temporary files...", *ui_before() shutil.rmtree(temp_path) WORKSPACE = output_path OUTPUT_FILE = output_video_path yield get_finsh_text(start_time), *ui_after_vid() ## ------------------------------ DIRECTORY ------------------------------ elif input_type == "Directory": extensions = ["jpg", "jpeg", "png", "bmp", "tiff", "ico", "webp"] temp_path = os.path.join(output_path, output_name) if os.path.exists(temp_path): shutil.rmtree(temp_path) os.mkdir(temp_path) file_paths =[] for file_path in glob.glob(os.path.join(directory_path, "*")): if any(file_path.lower().endswith(ext) for ext in extensions): img = cv2.imread(file_path) new_file_path = os.path.join(temp_path, os.path.basename(file_path)) cv2.imwrite(new_file_path, img) file_paths.append(new_file_path) for info_update in swap_process(file_paths): yield info_update PREVIEW = cv2.imread(file_paths[-1])[:, :, ::-1] WORKSPACE = temp_path OUTPUT_FILE = file_paths[-1] yield get_finsh_text(start_time), *ui_after() ## ------------------------------ STREAM ------------------------------ elif input_type == "Stream": pass ## ------------------------------ GRADIO FUNC ------------------------------ def update_radio(value): if value == "Image": return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) elif value == "Video": return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) elif value == "Directory": return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) elif value == "Stream": return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) def swap_option_changed(value): if value.startswith("Age"): return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), ) elif value == "Specific Face": return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) def video_changed(video_path): sliders_update = gr.Slider.update button_update = gr.Button.update number_update = gr.Number.update if video_path is None: return ( sliders_update(minimum=0, maximum=0, value=0), sliders_update(minimum=1, maximum=1, value=1), number_update(value=1), ) try: clip = VideoFileClip(video_path) fps = clip.fps total_frames = clip.reader.nframes clip.close() return ( sliders_update(minimum=0, maximum=total_frames, value=0, interactive=True), sliders_update( minimum=0, maximum=total_frames, value=total_frames, interactive=True ), number_update(value=fps), ) except: return ( sliders_update(value=0), sliders_update(value=0), number_update(value=1), ) def analyse_settings_changed(detect_condition, detection_size, detection_threshold): yield "### \n ⌛ Applying new values..." global FACE_ANALYSER global DETECT_CONDITION DETECT_CONDITION = detect_condition FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER) FACE_ANALYSER.prepare( ctx_id=0, det_size=(int(detection_size), int(detection_size)), det_thresh=float(detection_threshold), ) yield f"### \n ✔️ Applied detect condition:{detect_condition}, detection size: {detection_size}, detection threshold: {detection_threshold}" def stop_running(): global STREAMER if hasattr(STREAMER, "stop"): STREAMER.stop() STREAMER = None return "Cancelled" def slider_changed(show_frame, video_path, frame_index): if not show_frame: return None, None if video_path is None: return None, None clip = VideoFileClip(video_path) frame = clip.get_frame(frame_index / clip.fps) frame_array = np.array(frame) clip.close() return gr.Image.update(value=frame_array, visible=True), gr.Video.update( visible=False ) def trim_and_reload(video_path, output_path, output_name, start_frame, stop_frame): yield video_path, f"### \n ⌛ Trimming video frame {start_frame} to {stop_frame}..." try: output_path = os.path.join(output_path, output_name) trimmed_video = trim_video(video_path, output_path, start_frame, stop_frame) yield trimmed_video, "### \n ✔️ Video trimmed and reloaded." except Exception as e: print(e) yield video_path, "### \n ❌ Video trimming failed. See console for more info." ## ------------------------------ GRADIO GUI ------------------------------ css = """ footer{display:none !important} """ with gr.Blocks(css=css) as interface: gr.Markdown("# 🗿 Swap Mukham") gr.Markdown("### Face swap app based on insightface inswapper.") with gr.Row(): with gr.Row(): with gr.Column(scale=0.4): with gr.Tab("📄 Swap Condition"): swap_option = gr.Dropdown( swap_options_list, info="Choose which face or faces in the target image to swap.", multiselect=False, show_label=False, value=swap_options_list[0], interactive=True, ) age = gr.Number( value=25, label="Value", interactive=True, visible=False ) with gr.Tab("🎚️ Detection Settings"): detect_condition_dropdown = gr.Dropdown( detect_conditions, label="Condition", value=DETECT_CONDITION, interactive=True, info="This condition is only used when multiple faces are detected on source or specific image.", ) detection_size = gr.Number( label="Detection Size", value=DETECT_SIZE, interactive=True ) detection_threshold = gr.Number( label="Detection Threshold", value=DETECT_THRESH, interactive=True, ) apply_detection_settings = gr.Button("Apply settings") with gr.Tab("📤 Output Settings"): output_directory = gr.Text( label="Output Directory", value=DEF_OUTPUT_PATH, interactive=True, ) output_name = gr.Text( label="Output Name", value="Result", interactive=True ) keep_output_sequence = gr.Checkbox( label="Keep output sequence", value=False, interactive=True ) with gr.Tab("🪄 Other Settings"): face_scale = gr.Slider( label="Face Scale", minimum=0, maximum=2, value=1, interactive=True, ) face_enhancer_name = gr.Dropdown( FACE_ENHANCER_LIST, label="Face Enhancer", value="NONE", multiselect=False, interactive=True ) with gr.Accordion("Advanced Mask", open=False): enable_face_parser_mask = gr.Checkbox( label="Enable Face Parsing", value=False, interactive=True, ) mask_include = gr.Dropdown( mask_regions.keys(), value=MASK_INCLUDE, multiselect=True, label="Include", interactive=True, ) mask_soft_kernel = gr.Number( label="Soft Erode Kernel", value=MASK_SOFT_KERNEL, minimum=3, interactive=True, visible = False ) mask_soft_iterations = gr.Number( label="Soft Erode Iterations", value=MASK_SOFT_ITERATIONS, minimum=0, interactive=True, ) with gr.Accordion("Crop Mask", open=False): crop_top = gr.Slider(label="Top", minimum=0, maximum=511, value=0, step=1, interactive=True) crop_bott = gr.Slider(label="Bottom", minimum=0, maximum=511, value=511, step=1, interactive=True) crop_left = gr.Slider(label="Left", minimum=0, maximum=511, value=0, step=1, interactive=True) crop_right = gr.Slider(label="Right", minimum=0, maximum=511, value=511, step=1, interactive=True) erode_amount = gr.Slider( label="Mask Erode", minimum=0, maximum=1, value=MASK_ERODE_AMOUNT, step=0.05, interactive=True, ) blur_amount = gr.Slider( label="Mask Blur", minimum=0, maximum=1, value=MASK_BLUR_AMOUNT, step=0.05, interactive=True, ) enable_laplacian_blend = gr.Checkbox( label="Laplacian Blending", value=True, interactive=True, ) source_image_input = gr.Image( label="Source face", type="filepath", interactive=True ) with gr.Group(visible=False) as specific_face: for i in range(NUM_OF_SRC_SPECIFIC): idx = i + 1 code = "\n" code += f"with gr.Tab(label='({idx})'):" code += "\n\twith gr.Row():" code += f"\n\t\tsrc{idx} = gr.Image(interactive=True, type='numpy', label='Source Face {idx}')" code += f"\n\t\ttrg{idx} = gr.Image(interactive=True, type='numpy', label='Specific Face {idx}')" exec(code) distance_slider = gr.Slider( minimum=0, maximum=2, value=0.6, interactive=True, label="Distance", info="Lower distance is more similar and higher distance is less similar to the target face.", ) with gr.Group(): input_type = gr.Radio( ["Image", "Video"], label="Target Type", value="Image", ) with gr.Group(visible=True) as input_image_group: image_input = gr.Image( label="Target Image", interactive=True, type="filepath" ) with gr.Group(visible=False) as input_video_group: vid_widget = gr.Video if USE_COLAB else gr.Text video_input = gr.Video( label="Target Video", interactive=True ) with gr.Accordion("✂️ Trim video", open=False): with gr.Column(): with gr.Row(): set_slider_range_btn = gr.Button( "Set frame range", interactive=True ) show_trim_preview_btn = gr.Checkbox( label="Show frame when slider change", value=True, interactive=True, ) video_fps = gr.Number( value=30, interactive=False, label="Fps", visible=False, ) start_frame = gr.Slider( minimum=0, maximum=1, value=0, step=1, interactive=True, label="Start Frame", info="", ) end_frame = gr.Slider( minimum=0, maximum=1, value=1, step=1, interactive=True, label="End Frame", info="", ) trim_and_reload_btn = gr.Button( "Trim and Reload", interactive=True ) with gr.Group(visible=False) as input_directory_group: direc_input = gr.Text(label="Path", interactive=True) with gr.Column(scale=0.6): info = gr.Markdown(value="...") with gr.Row(): swap_button = gr.Button("✨ Swap", variant="primary") cancel_button = gr.Button("⛔ Cancel") preview_image = gr.Image(label="Output", interactive=False) preview_video = gr.Video( label="Output", interactive=False, visible=False ) with gr.Row(): output_directory_button = gr.Button( "📂", interactive=False, visible=False ) output_video_button = gr.Button( "🎬", interactive=False, visible=False ) with gr.Group(): with gr.Row(): gr.Markdown( "### [🤝 Sponsor](https://github.com/sponsors/harisreedhar)" ) gr.Markdown( "### [👨‍💻 Source code](https://github.com/harisreedhar/Swap-Mukham)" ) gr.Markdown( "### [⚠️ Disclaimer](https://github.com/harisreedhar/Swap-Mukham#disclaimer)" ) gr.Markdown( "### [🌐 Run in Colab](https://colab.research.google.com/github/harisreedhar/Swap-Mukham/blob/main/swap_mukham_colab.ipynb)" ) gr.Markdown( "### [🤗 Acknowledgements](https://github.com/harisreedhar/Swap-Mukham#acknowledgements)" ) ## ------------------------------ GRADIO EVENTS ------------------------------ set_slider_range_event = set_slider_range_btn.click( video_changed, inputs=[video_input], outputs=[start_frame, end_frame, video_fps], ) trim_and_reload_event = trim_and_reload_btn.click( fn=trim_and_reload, inputs=[video_input, output_directory, output_name, start_frame, end_frame], outputs=[video_input, info], ) start_frame_event = start_frame.release( fn=slider_changed, inputs=[show_trim_preview_btn, video_input, start_frame], outputs=[preview_image, preview_video], show_progress=True, ) end_frame_event = end_frame.release( fn=slider_changed, inputs=[show_trim_preview_btn, video_input, end_frame], outputs=[preview_image, preview_video], show_progress=True, ) input_type.change( update_radio, inputs=[input_type], outputs=[input_image_group, input_video_group, input_directory_group], ) swap_option.change( swap_option_changed, inputs=[swap_option], outputs=[age, specific_face, source_image_input], ) apply_detection_settings.click( analyse_settings_changed, inputs=[detect_condition_dropdown, detection_size, detection_threshold], outputs=[info], ) src_specific_inputs = [] gen_variable_txt = ",".join( [f"src{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] + [f"trg{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] ) exec(f"src_specific_inputs = ({gen_variable_txt})") swap_inputs = [ input_type, image_input, video_input, direc_input, source_image_input, output_directory, output_name, keep_output_sequence, swap_option, age, distance_slider, face_enhancer_name, enable_face_parser_mask, mask_include, mask_soft_kernel, mask_soft_iterations, blur_amount, erode_amount, face_scale, enable_laplacian_blend, crop_top, crop_bott, crop_left, crop_right, *src_specific_inputs, ] swap_outputs = [ info, preview_image, output_directory_button, output_video_button, preview_video, ] swap_event = swap_button.click( fn=process, inputs=swap_inputs, outputs=swap_outputs, show_progress=True ) cancel_button.click( fn=stop_running, inputs=None, outputs=[info], cancels=[ swap_event, trim_and_reload_event, set_slider_range_event, start_frame_event, end_frame_event, ], show_progress=True, ) output_directory_button.click( lambda: open_directory(path=WORKSPACE), inputs=None, outputs=None ) output_video_button.click( lambda: open_directory(path=OUTPUT_FILE), inputs=None, outputs=None ) if __name__ == "__main__": if USE_COLAB: print("Running in colab mode") interface.launch() #### APP.PY CODE END ###