|
#### FACE_ENHANCER.PY CODE START ### |
|
|
|
import os |
|
import cv2 |
|
import torch |
|
import gfpgan |
|
from PIL import Image |
|
from upscaler.RealESRGAN import RealESRGAN |
|
from upscaler.codeformer import CodeFormerEnhancer |
|
|
|
def gfpgan_runner(img, model): |
|
_, imgs, _ = model.enhance(img, paste_back=True, has_aligned=True) |
|
return imgs[0] |
|
|
|
|
|
def realesrgan_runner(img, model): |
|
img = model.predict(img) |
|
return img |
|
|
|
|
|
def codeformer_runner(img, model): |
|
img = model.enhance(img) |
|
return img |
|
|
|
|
|
supported_enhancers = { |
|
"CodeFormer": ("./assets/pretrained_models/codeformer.onnx", codeformer_runner), |
|
"GFPGAN": ("./assets/pretrained_models/GFPGANv1.4.pth", gfpgan_runner), |
|
"REAL-ESRGAN 2x": ("./assets/pretrained_models/RealESRGAN_x2.pth", realesrgan_runner), |
|
"REAL-ESRGAN 4x": ("./assets/pretrained_models/RealESRGAN_x4.pth", realesrgan_runner), |
|
"REAL-ESRGAN 8x": ("./assets/pretrained_models/RealESRGAN_x8.pth", realesrgan_runner) |
|
} |
|
|
|
cv2_interpolations = ["LANCZOS4", "CUBIC", "NEAREST"] |
|
|
|
def get_available_enhancer_names(): |
|
available = [] |
|
for name, data in supported_enhancers.items(): |
|
path = os.path.join(os.path.abspath(os.path.dirname(__file__)), data[0]) |
|
if os.path.exists(path): |
|
available.append(name) |
|
return available |
|
|
|
|
|
def load_face_enhancer_model(name='GFPGAN', device="cpu"): |
|
assert name in get_available_enhancer_names() + cv2_interpolations, f"Face enhancer {name} unavailable." |
|
if name in supported_enhancers.keys(): |
|
model_path, model_runner = supported_enhancers.get(name) |
|
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path) |
|
if name == 'CodeFormer': |
|
model = CodeFormerEnhancer(model_path=model_path, device=device) |
|
elif name == 'GFPGAN': |
|
model = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=device) |
|
elif name == 'REAL-ESRGAN 2x': |
|
model = RealESRGAN(device, scale=2) |
|
model.load_weights(model_path, download=False) |
|
elif name == 'REAL-ESRGAN 4x': |
|
model = RealESRGAN(device, scale=4) |
|
model.load_weights(model_path, download=False) |
|
elif name == 'REAL-ESRGAN 8x': |
|
model = RealESRGAN(device, scale=8) |
|
model.load_weights(model_path, download=False) |
|
elif name == 'LANCZOS4': |
|
model = None |
|
model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_LANCZOS4) |
|
elif name == 'CUBIC': |
|
model = None |
|
model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_CUBIC) |
|
elif name == 'NEAREST': |
|
model = None |
|
model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_NEAREST) |
|
else: |
|
model = None |
|
return (model, model_runner) |
|
|
|
|
|
#### FACE_EHNANCER.PY CODE END ### |
|
|
|
#### FACE_SWAPPER.PY CODE START ### |
|
|
|
import time |
|
import torch |
|
import onnx |
|
import cv2 |
|
import onnxruntime |
|
import numpy as np |
|
from tqdm import tqdm |
|
import torch.nn as nn |
|
from onnx import numpy_helper |
|
from skimage import transform as trans |
|
import torchvision.transforms.functional as F |
|
import torch.nn.functional as F |
|
from utils import mask_crop, laplacian_blending |
|
|
|
|
|
arcface_dst = np.array( |
|
[[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366], |
|
[41.5493, 92.3655], [70.7299, 92.2041]], |
|
dtype=np.float32) |
|
|
|
def estimate_norm(lmk, image_size=112, mode='arcface'): |
|
assert lmk.shape == (5, 2) |
|
assert image_size % 112 == 0 or image_size % 128 == 0 |
|
if image_size % 112 == 0: |
|
ratio = float(image_size) / 112.0 |
|
diff_x = 0 |
|
else: |
|
ratio = float(image_size) / 128.0 |
|
diff_x = 8.0 * ratio |
|
dst = arcface_dst * ratio |
|
dst[:, 0] += diff_x |
|
tform = trans.SimilarityTransform() |
|
tform.estimate(lmk, dst) |
|
M = tform.params[0:2, :] |
|
return M |
|
|
|
|
|
def norm_crop2(img, landmark, image_size=112, mode='arcface'): |
|
M = estimate_norm(landmark, image_size, mode) |
|
warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0) |
|
return warped, M |
|
|
|
|
|
class Inswapper(): |
|
def __init__(self, model_file=None, batch_size=32, providers=['CPUExecutionProvider']): |
|
self.model_file = model_file |
|
self.batch_size = batch_size |
|
|
|
model = onnx.load(self.model_file) |
|
graph = model.graph |
|
self.emap = numpy_helper.to_array(graph.initializer[-1]) |
|
|
|
self.session_options = onnxruntime.SessionOptions() |
|
self.session = onnxruntime.InferenceSession(self.model_file, sess_options=self.session_options, providers=providers) |
|
|
|
def forward(self, imgs, latents): |
|
preds = [] |
|
for img, latent in zip(imgs, latents): |
|
img = img / 255 |
|
pred = self.session.run(['output'], {'target': img, 'source': latent})[0] |
|
preds.append(pred) |
|
|
|
def get(self, imgs, target_faces, source_faces): |
|
imgs = list(imgs) |
|
|
|
preds = [None] * len(imgs) |
|
matrs = [None] * len(imgs) |
|
|
|
for idx, (img, target_face, source_face) in enumerate(zip(imgs, target_faces, source_faces)): |
|
matrix, blob, latent = self.prepare_data(img, target_face, source_face) |
|
pred = self.session.run(['output'], {'target': blob, 'source': latent})[0] |
|
pred = pred.transpose((0, 2, 3, 1))[0] |
|
pred = np.clip(255 * pred, 0, 255).astype(np.uint8)[:, :, ::-1] |
|
|
|
preds[idx] = pred |
|
matrs[idx] = matrix |
|
|
|
return (preds, matrs) |
|
|
|
def prepare_data(self, img, target_face, source_face): |
|
if isinstance(img, str): |
|
img = cv2.imread(img) |
|
|
|
aligned_img, matrix = norm_crop2(img, target_face.kps, 128) |
|
|
|
blob = cv2.dnn.blobFromImage(aligned_img, 1.0 / 255, (128, 128), (0., 0., 0.), swapRB=True) |
|
|
|
latent = source_face.normed_embedding.reshape((1, -1)) |
|
latent = np.dot(latent, self.emap) |
|
latent /= np.linalg.norm(latent) |
|
|
|
return (matrix, blob, latent) |
|
|
|
def batch_forward(self, img_list, target_f_list, source_f_list): |
|
num_samples = len(img_list) |
|
num_batches = (num_samples + self.batch_size - 1) // self.batch_size |
|
|
|
for i in tqdm(range(num_batches), desc="Generating face"): |
|
start_idx = i * self.batch_size |
|
end_idx = min((i + 1) * self.batch_size, num_samples) |
|
|
|
batch_img = img_list[start_idx:end_idx] |
|
batch_target_f = target_f_list[start_idx:end_idx] |
|
batch_source_f = source_f_list[start_idx:end_idx] |
|
|
|
batch_pred, batch_matr = self.get(batch_img, batch_target_f, batch_source_f) |
|
|
|
yield batch_pred, batch_matr |
|
|
|
|
|
def paste_to_whole(foreground, background, matrix, mask=None, crop_mask=(0,0,0,0), blur_amount=0.1, erode_amount = 0.15, blend_method='linear'): |
|
inv_matrix = cv2.invertAffineTransform(matrix) |
|
fg_shape = foreground.shape[:2] |
|
bg_shape = (background.shape[1], background.shape[0]) |
|
foreground = cv2.warpAffine(foreground, inv_matrix, bg_shape, borderValue=0.0) |
|
|
|
if mask is None: |
|
mask = np.full(fg_shape, 1., dtype=np.float32) |
|
mask = mask_crop(mask, crop_mask) |
|
mask = cv2.warpAffine(mask, inv_matrix, bg_shape, borderValue=0.0) |
|
else: |
|
assert fg_shape == mask.shape[:2], "foreground & mask shape mismatch!" |
|
mask = mask_crop(mask, crop_mask).astype('float32') |
|
mask = cv2.warpAffine(mask, inv_matrix, (background.shape[1], background.shape[0]), borderValue=0.0) |
|
|
|
_mask = mask.copy() |
|
_mask[_mask > 0.05] = 1. |
|
non_zero_points = cv2.findNonZero(_mask) |
|
_, _, w, h = cv2.boundingRect(non_zero_points) |
|
mask_size = int(np.sqrt(w * h)) |
|
|
|
if erode_amount > 0: |
|
kernel_size = max(int(mask_size * erode_amount), 1) |
|
structuring_element = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size)) |
|
mask = cv2.erode(mask, structuring_element) |
|
|
|
if blur_amount > 0: |
|
kernel_size = max(int(mask_size * blur_amount), 3) |
|
if kernel_size % 2 == 0: |
|
kernel_size += 1 |
|
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0) |
|
|
|
mask = np.tile(np.expand_dims(mask, axis=-1), (1, 1, 3)) |
|
|
|
if blend_method == 'laplacian': |
|
composite_image = laplacian_blending(foreground, background, mask.clip(0,1), num_levels=4) |
|
else: |
|
composite_image = mask * foreground + (1 - mask) * background |
|
|
|
return composite_image.astype("uint8").clip(0, 255) |
|
|
|
#### FACE_SWAPPER.PY CODE END ### |
|
|
|
|
|
#### FACE_ANALYSER.PY CODE START ### |
|
|
|
import os |
|
import cv2 |
|
import numpy as np |
|
from tqdm import tqdm |
|
from utils import scale_bbox_from_center |
|
|
|
detect_conditions = [ |
|
"best detection", |
|
"left most", |
|
"right most", |
|
"top most", |
|
"bottom most", |
|
"middle", |
|
"biggest", |
|
"smallest", |
|
] |
|
|
|
swap_options_list = [ |
|
"All Face", |
|
"Specific Face", |
|
"Age less than", |
|
"Age greater than", |
|
"All Male", |
|
"All Female", |
|
"Left Most", |
|
"Right Most", |
|
"Top Most", |
|
"Bottom Most", |
|
"Middle", |
|
"Biggest", |
|
"Smallest", |
|
] |
|
|
|
def get_single_face(faces, method="best detection"): |
|
total_faces = len(faces) |
|
if total_faces == 1: |
|
return faces[0] |
|
|
|
print(f"{total_faces} face detected. Using {method} face.") |
|
if method == "best detection": |
|
return sorted(faces, key=lambda face: face["det_score"])[-1] |
|
elif method == "left most": |
|
return sorted(faces, key=lambda face: face["bbox"][0])[0] |
|
elif method == "right most": |
|
return sorted(faces, key=lambda face: face["bbox"][0])[-1] |
|
elif method == "top most": |
|
return sorted(faces, key=lambda face: face["bbox"][1])[0] |
|
elif method == "bottom most": |
|
return sorted(faces, key=lambda face: face["bbox"][1])[-1] |
|
elif method == "middle": |
|
return sorted(faces, key=lambda face: ( |
|
(face["bbox"][0] + face["bbox"][2]) / 2 - 0.5) ** 2 + |
|
((face["bbox"][1] + face["bbox"][3]) / 2 - 0.5) ** 2)[len(faces) // 2] |
|
elif method == "biggest": |
|
return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[-1] |
|
elif method == "smallest": |
|
return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[0] |
|
|
|
|
|
def analyse_face(image, model, return_single_face=True, detect_condition="best detection", scale=1.0): |
|
faces = model.get(image) |
|
if scale != 1: # landmark-scale |
|
for i, face in enumerate(faces): |
|
landmark = face['kps'] |
|
center = np.mean(landmark, axis=0) |
|
landmark = center + (landmark - center) * scale |
|
faces[i]['kps'] = landmark |
|
|
|
if not return_single_face: |
|
return faces |
|
|
|
return get_single_face(faces, method=detect_condition) |
|
|
|
|
|
def cosine_distance(a, b): |
|
a /= np.linalg.norm(a) |
|
b /= np.linalg.norm(b) |
|
return 1 - np.dot(a, b) |
|
|
|
|
|
def get_analysed_data(face_analyser, image_sequence, source_data, swap_condition="All face", detect_condition="left most", scale=1.0): |
|
if swap_condition != "Specific Face": |
|
source_path, age = source_data |
|
source_image = cv2.imread(source_path) |
|
analysed_source = analyse_face(source_image, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) |
|
else: |
|
analysed_source_specifics = [] |
|
source_specifics, threshold = source_data |
|
for source, specific in zip(*source_specifics): |
|
if source is None or specific is None: |
|
continue |
|
analysed_source = analyse_face(source, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) |
|
analysed_specific = analyse_face(specific, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) |
|
analysed_source_specifics.append([analysed_source, analysed_specific]) |
|
|
|
analysed_target_list = [] |
|
analysed_source_list = [] |
|
whole_frame_eql_list = [] |
|
num_faces_per_frame = [] |
|
|
|
total_frames = len(image_sequence) |
|
curr_idx = 0 |
|
for curr_idx, frame_path in tqdm(enumerate(image_sequence), total=total_frames, desc="Analysing face data"): |
|
frame = cv2.imread(frame_path) |
|
analysed_faces = analyse_face(frame, face_analyser, return_single_face=False, detect_condition=detect_condition, scale=scale) |
|
|
|
n_faces = 0 |
|
for analysed_face in analysed_faces: |
|
if swap_condition == "All Face": |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
elif swap_condition == "Age less than" and analysed_face["age"] < age: |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
elif swap_condition == "Age greater than" and analysed_face["age"] > age: |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
elif swap_condition == "All Male" and analysed_face["gender"] == 1: |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
elif swap_condition == "All Female" and analysed_face["gender"] == 0: |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
elif swap_condition == "Specific Face": |
|
for analysed_source, analysed_specific in analysed_source_specifics: |
|
distance = cosine_distance(analysed_specific["embedding"], analysed_face["embedding"]) |
|
if distance < threshold: |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
if swap_condition == "Left Most": |
|
analysed_face = get_single_face(analysed_faces, method="left most") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Right Most": |
|
analysed_face = get_single_face(analysed_faces, method="right most") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Top Most": |
|
analysed_face = get_single_face(analysed_faces, method="top most") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Bottom Most": |
|
analysed_face = get_single_face(analysed_faces, method="bottom most") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Middle": |
|
analysed_face = get_single_face(analysed_faces, method="middle") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Biggest": |
|
analysed_face = get_single_face(analysed_faces, method="biggest") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
elif swap_condition == "Smallest": |
|
analysed_face = get_single_face(analysed_faces, method="smallest") |
|
analysed_target_list.append(analysed_face) |
|
analysed_source_list.append(analysed_source) |
|
whole_frame_eql_list.append(frame_path) |
|
n_faces += 1 |
|
|
|
num_faces_per_frame.append(n_faces) |
|
|
|
return analysed_target_list, analysed_source_list, whole_frame_eql_list, num_faces_per_frame |
|
|
|
|
|
#### FACE_ANALYSER.PY CODE END ### |
|
|
|
#### UTILS.PY CODE START ### |
|
|
|
|
|
import os |
|
import cv2 |
|
import time |
|
import glob |
|
import shutil |
|
import platform |
|
import datetime |
|
import subprocess |
|
import numpy as np |
|
from threading import Thread |
|
from moviepy.editor import VideoFileClip, ImageSequenceClip |
|
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip |
|
|
|
|
|
logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED) |
|
|
|
|
|
quality_types = ["poor", "low", "medium", "high", "best"] |
|
|
|
|
|
bitrate_quality_by_resolution = { |
|
240: {"poor": "300k", "low": "500k", "medium": "800k", "high": "1000k", "best": "1200k"}, |
|
360: {"poor": "500k","low": "800k","medium": "1200k","high": "1500k","best": "2000k"}, |
|
480: {"poor": "800k","low": "1200k","medium": "2000k","high": "2500k","best": "3000k"}, |
|
720: {"poor": "1500k","low": "2500k","medium": "4000k","high": "5000k","best": "6000k"}, |
|
1080: {"poor": "2500k","low": "4000k","medium": "6000k","high": "7000k","best": "8000k"}, |
|
1440: {"poor": "4000k","low": "6000k","medium": "8000k","high": "10000k","best": "12000k"}, |
|
2160: {"poor": "8000k","low": "10000k","medium": "12000k","high": "15000k","best": "20000k"} |
|
} |
|
|
|
|
|
crf_quality_by_resolution = { |
|
240: {"poor": 45, "low": 35, "medium": 28, "high": 23, "best": 20}, |
|
360: {"poor": 35, "low": 28, "medium": 23, "high": 20, "best": 18}, |
|
480: {"poor": 28, "low": 23, "medium": 20, "high": 18, "best": 16}, |
|
720: {"poor": 23, "low": 20, "medium": 18, "high": 16, "best": 14}, |
|
1080: {"poor": 20, "low": 18, "medium": 16, "high": 14, "best": 12}, |
|
1440: {"poor": 18, "low": 16, "medium": 14, "high": 12, "best": 10}, |
|
2160: {"poor": 16, "low": 14, "medium": 12, "high": 10, "best": 8} |
|
} |
|
|
|
|
|
def get_bitrate_for_resolution(resolution, quality): |
|
available_resolutions = list(bitrate_quality_by_resolution.keys()) |
|
closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) |
|
return bitrate_quality_by_resolution[closest_resolution][quality] |
|
|
|
|
|
def get_crf_for_resolution(resolution, quality): |
|
available_resolutions = list(crf_quality_by_resolution.keys()) |
|
closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) |
|
return crf_quality_by_resolution[closest_resolution][quality] |
|
|
|
|
|
def get_video_bitrate(video_file): |
|
ffprobe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', |
|
'stream=bit_rate', '-of', 'default=noprint_wrappers=1:nokey=1', video_file] |
|
result = subprocess.run(ffprobe_cmd, stdout=subprocess.PIPE) |
|
kbps = max(int(result.stdout) // 1000, 10) |
|
return str(kbps) + 'k' |
|
|
|
|
|
def trim_video(video_path, output_path, start_frame, stop_frame): |
|
video_name, _ = os.path.splitext(os.path.basename(video_path)) |
|
trimmed_video_filename = video_name + "_trimmed" + ".mp4" |
|
temp_path = os.path.join(output_path, "trim") |
|
os.makedirs(temp_path, exist_ok=True) |
|
trimmed_video_file_path = os.path.join(temp_path, trimmed_video_filename) |
|
|
|
video = VideoFileClip(video_path, fps_source="fps") |
|
fps = video.fps |
|
start_time = start_frame / fps |
|
duration = (stop_frame - start_frame) / fps |
|
|
|
bitrate = get_bitrate_for_resolution(min(*video.size), "high") |
|
|
|
trimmed_video = video.subclip(start_time, start_time + duration) |
|
trimmed_video.write_videofile( |
|
trimmed_video_file_path, codec="libx264", audio_codec="aac", bitrate=bitrate, |
|
) |
|
trimmed_video.close() |
|
video.close() |
|
|
|
return trimmed_video_file_path |
|
|
|
|
|
def open_directory(path=None): |
|
if path is None: |
|
return |
|
try: |
|
os.startfile(path) |
|
except: |
|
subprocess.Popen(["xdg-open", path]) |
|
|
|
|
|
class StreamerThread(object): |
|
def __init__(self, src=0): |
|
self.capture = cv2.VideoCapture(src) |
|
self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2) |
|
self.FPS = 1 / 30 |
|
self.FPS_MS = int(self.FPS * 1000) |
|
self.thread = None |
|
self.stopped = False |
|
self.frame = None |
|
|
|
def start(self): |
|
self.thread = Thread(target=self.update, args=()) |
|
self.thread.daemon = True |
|
self.thread.start() |
|
|
|
def stop(self): |
|
self.stopped = True |
|
self.thread.join() |
|
print("stopped") |
|
|
|
def update(self): |
|
while not self.stopped: |
|
if self.capture.isOpened(): |
|
(self.status, self.frame) = self.capture.read() |
|
time.sleep(self.FPS) |
|
|
|
|
|
class ProcessBar: |
|
def __init__(self, bar_length, total, before="β¬", after="π¨"): |
|
self.bar_length = bar_length |
|
self.total = total |
|
self.before = before |
|
self.after = after |
|
self.bar = [self.before] * bar_length |
|
self.start_time = time.time() |
|
|
|
def get(self, index): |
|
total = self.total |
|
elapsed_time = time.time() - self.start_time |
|
average_time_per_iteration = elapsed_time / (index + 1) |
|
remaining_iterations = total - (index + 1) |
|
estimated_remaining_time = remaining_iterations * average_time_per_iteration |
|
|
|
self.bar[int(index / total * self.bar_length)] = self.after |
|
info_text = f"({index+1}/{total}) {''.join(self.bar)} " |
|
info_text += f"(ETR: {int(estimated_remaining_time // 60)} min {int(estimated_remaining_time % 60)} sec)" |
|
return info_text |
|
|
|
|
|
def add_logo_to_image(img, logo=logo_image): |
|
logo_size = int(img.shape[1] * 0.1) |
|
logo = cv2.resize(logo, (logo_size, logo_size)) |
|
if logo.shape[2] == 4: |
|
alpha = logo[:, :, 3] |
|
else: |
|
alpha = np.ones_like(logo[:, :, 0]) * 255 |
|
padding = int(logo_size * 0.1) |
|
roi = img.shape[0] - logo_size - padding, img.shape[1] - logo_size - padding |
|
for c in range(0, 3): |
|
img[roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c] = ( |
|
alpha / 255.0 |
|
) * logo[:, :, c] + (1 - alpha / 255.0) * img[ |
|
roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c |
|
] |
|
return img |
|
|
|
|
|
def split_list_by_lengths(data, length_list): |
|
split_data = [] |
|
start_idx = 0 |
|
for length in length_list: |
|
end_idx = start_idx + length |
|
sublist = data[start_idx:end_idx] |
|
split_data.append(sublist) |
|
start_idx = end_idx |
|
return split_data |
|
|
|
|
|
def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name): |
|
video_clip = VideoFileClip(ref_video_path, fps_source="fps") |
|
fps = video_clip.fps |
|
duration = video_clip.duration |
|
total_frames = video_clip.reader.nframes |
|
audio_clip = video_clip.audio if video_clip.audio is not None else None |
|
edited_video_clip = ImageSequenceClip(image_sequence, fps=fps) |
|
|
|
if audio_clip is not None: |
|
edited_video_clip = edited_video_clip.set_audio(audio_clip) |
|
|
|
bitrate = get_bitrate_for_resolution(min(*edited_video_clip.size), "high") |
|
|
|
edited_video_clip.set_duration(duration).write_videofile( |
|
output_file_name, codec="libx264", bitrate=bitrate, |
|
) |
|
edited_video_clip.close() |
|
video_clip.close() |
|
|
|
|
|
def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_height): |
|
# Extract the coordinates of the bbox |
|
x1, y1, x2, y2 = bbox |
|
|
|
# Calculate the center point of the bbox |
|
center_x = (x1 + x2) / 2 |
|
center_y = (y1 + y2) / 2 |
|
|
|
# Calculate the new width and height of the bbox based on the scaling factors |
|
width = x2 - x1 |
|
height = y2 - y1 |
|
new_width = width * scale_width |
|
new_height = height * scale_height |
|
|
|
# Calculate the new coordinates of the bbox, considering the image boundaries |
|
new_x1 = center_x - new_width / 2 |
|
new_y1 = center_y - new_height / 2 |
|
new_x2 = center_x + new_width / 2 |
|
new_y2 = center_y + new_height / 2 |
|
|
|
# Adjust the coordinates to ensure the bbox remains within the image boundaries |
|
new_x1 = max(0, new_x1) |
|
new_y1 = max(0, new_y1) |
|
new_x2 = min(image_width - 1, new_x2) |
|
new_y2 = min(image_height - 1, new_y2) |
|
|
|
# Return the scaled bbox coordinates |
|
scaled_bbox = [new_x1, new_y1, new_x2, new_y2] |
|
return scaled_bbox |
|
|
|
|
|
def laplacian_blending(A, B, m, num_levels=7): |
|
assert A.shape == B.shape |
|
assert B.shape == m.shape |
|
height = m.shape[0] |
|
width = m.shape[1] |
|
size_list = np.array([4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192]) |
|
size = size_list[np.where(size_list > max(height, width))][0] |
|
GA = np.zeros((size, size, 3), dtype=np.float32) |
|
GA[:height, :width, :] = A |
|
GB = np.zeros((size, size, 3), dtype=np.float32) |
|
GB[:height, :width, :] = B |
|
GM = np.zeros((size, size, 3), dtype=np.float32) |
|
GM[:height, :width, :] = m |
|
gpA = [GA] |
|
gpB = [GB] |
|
gpM = [GM] |
|
for i in range(num_levels): |
|
GA = cv2.pyrDown(GA) |
|
GB = cv2.pyrDown(GB) |
|
GM = cv2.pyrDown(GM) |
|
gpA.append(np.float32(GA)) |
|
gpB.append(np.float32(GB)) |
|
gpM.append(np.float32(GM)) |
|
lpA = [gpA[num_levels-1]] |
|
lpB = [gpB[num_levels-1]] |
|
gpMr = [gpM[num_levels-1]] |
|
for i in range(num_levels-1,0,-1): |
|
LA = np.subtract(gpA[i-1], cv2.pyrUp(gpA[i])) |
|
LB = np.subtract(gpB[i-1], cv2.pyrUp(gpB[i])) |
|
lpA.append(LA) |
|
lpB.append(LB) |
|
gpMr.append(gpM[i-1]) |
|
LS = [] |
|
for la,lb,gm in zip(lpA,lpB,gpMr): |
|
ls = la * gm + lb * (1.0 - gm) |
|
LS.append(ls) |
|
ls_ = LS[0] |
|
for i in range(1,num_levels): |
|
ls_ = cv2.pyrUp(ls_) |
|
ls_ = cv2.add(ls_, LS[i]) |
|
ls_ = ls_[:height, :width, :] |
|
#ls_ = (ls_ - np.min(ls_)) * (255.0 / (np.max(ls_) - np.min(ls_))) |
|
return ls_.clip(0, 255) |
|
|
|
|
|
def mask_crop(mask, crop): |
|
top, bottom, left, right = crop |
|
shape = mask.shape |
|
top = int(top) |
|
bottom = int(bottom) |
|
if top + bottom < shape[1]: |
|
if top > 0: mask[:top, :] = 0 |
|
if bottom > 0: mask[-bottom:, :] = 0 |
|
|
|
left = int(left) |
|
right = int(right) |
|
if left + right < shape[0]: |
|
if left > 0: mask[:, :left] = 0 |
|
if right > 0: mask[:, -right:] = 0 |
|
|
|
return mask |
|
|
|
def create_image_grid(images, size=128): |
|
num_images = len(images) |
|
num_cols = int(np.ceil(np.sqrt(num_images))) |
|
num_rows = int(np.ceil(num_images / num_cols)) |
|
grid = np.zeros((num_rows * size, num_cols * size, 3), dtype=np.uint8) |
|
|
|
for i, image in enumerate(images): |
|
row_idx = (i // num_cols) * size |
|
col_idx = (i % num_cols) * size |
|
image = cv2.resize(image.copy(), (size,size)) |
|
if image.dtype != np.uint8: |
|
image = (image.astype('float32') * 255).astype('uint8') |
|
if image.ndim == 2: |
|
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) |
|
grid[row_idx:row_idx + size, col_idx:col_idx + size] = image |
|
|
|
return grid |
|
|
|
|
|
#### UTILS.PY CODE END ### |
|
|
|
#### APP.PY CODE END ### |
|
|
|
import os |
|
import spaces |
|
import cv2 |
|
import glob |
|
import time |
|
import torch |
|
import shutil |
|
import argparse |
|
import platform |
|
import datetime |
|
import subprocess |
|
import insightface |
|
import onnxruntime |
|
import numpy as np |
|
import gradio as gr |
|
import threading |
|
import queue |
|
from tqdm import tqdm |
|
import concurrent.futures |
|
from moviepy.editor import VideoFileClip |
|
|
|
from nsfw_checker import NSFWChecker |
|
from face_swapper import Inswapper, paste_to_whole |
|
from face_analyser import detect_conditions, get_analysed_data, swap_options_list |
|
from face_parsing import init_parsing_model, get_parsed_mask, mask_regions, mask_regions_to_list |
|
from face_enhancer import get_available_enhancer_names, load_face_enhancer_model, cv2_interpolations |
|
from utils import trim_video, StreamerThread, ProcessBar, open_directory, split_list_by_lengths, merge_img_sequence_from_ref, create_image_grid |
|
|
|
## |
|
|
|
parser = argparse.ArgumentParser(description="Swap-Mukham Face Swapper") |
|
parser.add_argument("--out_dir", help="Default Output directory", default=os.getcwd()) |
|
parser.add_argument("--batch_size", help="Gpu batch size", default=32) |
|
parser.add_argument("--cuda", action="store_true", help="Enable cuda", default=False) |
|
parser.add_argument( |
|
"--colab", action="store_true", help="Enable colab mode", default=False |
|
) |
|
user_args = parser.parse_args() |
|
|
|
## |
|
|
|
USE_COLAB = user_args.colab |
|
USE_CUDA = user_args.cuda |
|
DEF_OUTPUT_PATH = user_args.out_dir |
|
BATCH_SIZE = int(user_args.batch_size) |
|
WORKSPACE = None |
|
OUTPUT_FILE = None |
|
CURRENT_FRAME = None |
|
STREAMER = None |
|
DETECT_CONDITION = "best detection" |
|
DETECT_SIZE = 640 |
|
DETECT_THRESH = 0.6 |
|
NUM_OF_SRC_SPECIFIC = 10 |
|
MASK_INCLUDE = [ |
|
"Skin", |
|
"R-Eyebrow", |
|
"L-Eyebrow", |
|
"L-Eye", |
|
"R-Eye", |
|
"Nose", |
|
"Mouth", |
|
"L-Lip", |
|
"U-Lip" |
|
] |
|
MASK_SOFT_KERNEL = 17 |
|
MASK_SOFT_ITERATIONS = 10 |
|
MASK_BLUR_AMOUNT = 0.1 |
|
MASK_ERODE_AMOUNT = 0.15 |
|
|
|
FACE_SWAPPER = None |
|
FACE_ANALYSER = None |
|
FACE_ENHANCER = None |
|
FACE_PARSER = None |
|
NSFW_DETECTOR = None |
|
FACE_ENHANCER_LIST = ["NONE"] |
|
FACE_ENHANCER_LIST.extend(get_available_enhancer_names()) |
|
FACE_ENHANCER_LIST.extend(cv2_interpolations) |
|
|
|
## |
|
# Note: Non CUDA users may change settings here |
|
|
|
PROVIDER = ["CPUExecutionProvider"] |
|
|
|
if USE_CUDA: |
|
available_providers = onnxruntime.get_available_providers() |
|
if "CUDAExecutionProvider" in available_providers: |
|
print("\n********** Running on CUDA **********\n") |
|
PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"] |
|
else: |
|
USE_CUDA = False |
|
print("\n********** CUDA unavailable running on CPU **********\n") |
|
else: |
|
USE_CUDA = False |
|
print("\n********** Running on CPU **********\n") |
|
|
|
device = "cuda" if USE_CUDA else "cpu" |
|
EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None |
|
|
|
## |
|
|
|
def load_face_analyser_model(name="buffalo_l"): |
|
global FACE_ANALYSER |
|
if FACE_ANALYSER is None: |
|
FACE_ANALYSER = insightface.app.FaceAnalysis(name=name, providers=PROVIDER) |
|
FACE_ANALYSER.prepare( |
|
ctx_id=0, det_size=(DETECT_SIZE, DETECT_SIZE), det_thresh=DETECT_THRESH |
|
) |
|
|
|
|
|
def load_face_swapper_model(path="./assets/pretrained_models/inswapper_128.onnx"): |
|
global FACE_SWAPPER |
|
if FACE_SWAPPER is None: |
|
batch = int(BATCH_SIZE) if device == "cuda" else 1 |
|
FACE_SWAPPER = Inswapper(model_file=path, batch_size=batch, providers=PROVIDER) |
|
|
|
|
|
def load_face_parser_model(path="./assets/pretrained_models/79999_iter.pth"): |
|
global FACE_PARSER |
|
if FACE_PARSER is None: |
|
FACE_PARSER = init_parsing_model(path, device=device) |
|
|
|
def load_nsfw_detector_model(path="./assets/pretrained_models/open-nsfw.onnx"): |
|
global NSFW_DETECTOR |
|
if NSFW_DETECTOR is None: |
|
NSFW_DETECTOR = NSFWChecker(model_path=path, providers=PROVIDER) |
|
|
|
|
|
load_face_analyser_model() |
|
load_face_swapper_model() |
|
|
|
## |
|
|
|
|
|
@spaces.GPU(duration=300, enable_queue=True) |
|
def process( |
|
input_type, |
|
image_path, |
|
video_path, |
|
directory_path, |
|
source_path, |
|
output_path, |
|
output_name, |
|
keep_output_sequence, |
|
condition, |
|
age, |
|
distance, |
|
face_enhancer_name, |
|
enable_face_parser, |
|
mask_includes, |
|
mask_soft_kernel, |
|
mask_soft_iterations, |
|
blur_amount, |
|
erode_amount, |
|
face_scale, |
|
enable_laplacian_blend, |
|
crop_top, |
|
crop_bott, |
|
crop_left, |
|
crop_right, |
|
*specifics, |
|
): |
|
global WORKSPACE |
|
global OUTPUT_FILE |
|
global PREVIEW |
|
WORKSPACE, OUTPUT_FILE, PREVIEW = None, None, None |
|
|
|
## |
|
|
|
def ui_before(): |
|
return ( |
|
gr.update(visible=True, value=PREVIEW), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(visible=False), |
|
) |
|
|
|
def ui_after(): |
|
return ( |
|
gr.update(visible=True, value=PREVIEW), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(visible=False), |
|
) |
|
|
|
def ui_after_vid(): |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(value=OUTPUT_FILE, visible=True), |
|
) |
|
|
|
start_time = time.time() |
|
total_exec_time = lambda start_time: divmod(time.time() - start_time, 60) |
|
get_finsh_text = lambda start_time: f"βοΈ Completed in {int(total_exec_time(start_time)[0])} min {int(total_exec_time(start_time)[1])} sec." |
|
|
|
## |
|
|
|
yield "### \n β Loading NSFW detector model...", *ui_before() |
|
load_nsfw_detector_model() |
|
|
|
yield "### \n β Loading face analyser model...", *ui_before() |
|
load_face_analyser_model() |
|
|
|
yield "### \n β Loading face swapper model...", *ui_before() |
|
load_face_swapper_model() |
|
|
|
if face_enhancer_name != "NONE": |
|
if face_enhancer_name not in cv2_interpolations: |
|
yield f"### \n β Loading {face_enhancer_name} model...", *ui_before() |
|
FACE_ENHANCER = load_face_enhancer_model(name=face_enhancer_name, device=device) |
|
else: |
|
FACE_ENHANCER = None |
|
|
|
if enable_face_parser: |
|
yield "### \n β Loading face parsing model...", *ui_before() |
|
load_face_parser_model() |
|
|
|
includes = mask_regions_to_list(mask_includes) |
|
specifics = list(specifics) |
|
half = len(specifics) // 2 |
|
sources = specifics[:half] |
|
specifics = specifics[half:] |
|
if crop_top > crop_bott: |
|
crop_top, crop_bott = crop_bott, crop_top |
|
if crop_left > crop_right: |
|
crop_left, crop_right = crop_right, crop_left |
|
crop_mask = (crop_top, 511-crop_bott, crop_left, 511-crop_right) |
|
|
|
def swap_process(image_sequence): |
|
## |
|
|
|
yield "### \n β Checking contents...", *ui_before() |
|
nsfw = NSFW_DETECTOR.is_nsfw(image_sequence) |
|
if nsfw: |
|
message = "NSFW Content detected !!!" |
|
yield f"### \n π {message}", *ui_before() |
|
assert not nsfw, message |
|
return False |
|
EMPTY_CACHE() |
|
|
|
## |
|
|
|
yield "### \n β Analysing face data...", *ui_before() |
|
if condition != "Specific Face": |
|
source_data = source_path, age |
|
else: |
|
source_data = ((sources, specifics), distance) |
|
analysed_targets, analysed_sources, whole_frame_list, num_faces_per_frame = get_analysed_data( |
|
FACE_ANALYSER, |
|
image_sequence, |
|
source_data, |
|
swap_condition=condition, |
|
detect_condition=DETECT_CONDITION, |
|
scale=face_scale |
|
) |
|
|
|
## |
|
|
|
yield "### \n β Generating faces...", *ui_before() |
|
preds = [] |
|
matrs = [] |
|
count = 0 |
|
global PREVIEW |
|
for batch_pred, batch_matr in FACE_SWAPPER.batch_forward(whole_frame_list, analysed_targets, analysed_sources): |
|
preds.extend(batch_pred) |
|
matrs.extend(batch_matr) |
|
EMPTY_CACHE() |
|
count += 1 |
|
|
|
if USE_CUDA: |
|
image_grid = create_image_grid(batch_pred, size=128) |
|
PREVIEW = image_grid[:, :, ::-1] |
|
yield f"### \n β Generating face Batch {count}", *ui_before() |
|
|
|
## |
|
|
|
generated_len = len(preds) |
|
if face_enhancer_name != "NONE": |
|
yield f"### \n β Upscaling faces with {face_enhancer_name}...", *ui_before() |
|
for idx, pred in tqdm(enumerate(preds), total=generated_len, desc=f"Upscaling with {face_enhancer_name}"): |
|
enhancer_model, enhancer_model_runner = FACE_ENHANCER |
|
pred = enhancer_model_runner(pred, enhancer_model) |
|
preds[idx] = cv2.resize(pred, (512,512)) |
|
EMPTY_CACHE() |
|
|
|
## |
|
|
|
if enable_face_parser: |
|
yield "### \n β Face-parsing mask...", *ui_before() |
|
masks = [] |
|
count = 0 |
|
for batch_mask in get_parsed_mask(FACE_PARSER, preds, classes=includes, device=device, batch_size=BATCH_SIZE, softness=int(mask_soft_iterations)): |
|
masks.append(batch_mask) |
|
EMPTY_CACHE() |
|
count += 1 |
|
|
|
if len(batch_mask) > 1: |
|
image_grid = create_image_grid(batch_mask, size=128) |
|
PREVIEW = image_grid[:, :, ::-1] |
|
yield f"### \n β Face parsing Batch {count}", *ui_before() |
|
masks = np.concatenate(masks, axis=0) if len(masks) >= 1 else masks |
|
else: |
|
masks = [None] * generated_len |
|
|
|
## |
|
|
|
split_preds = split_list_by_lengths(preds, num_faces_per_frame) |
|
del preds |
|
split_matrs = split_list_by_lengths(matrs, num_faces_per_frame) |
|
del matrs |
|
split_masks = split_list_by_lengths(masks, num_faces_per_frame) |
|
del masks |
|
|
|
## |
|
|
|
yield "### \n β Pasting back...", *ui_before() |
|
def post_process(frame_idx, frame_img, split_preds, split_matrs, split_masks, enable_laplacian_blend, crop_mask, blur_amount, erode_amount): |
|
whole_img_path = frame_img |
|
whole_img = cv2.imread(whole_img_path) |
|
blend_method = 'laplacian' if enable_laplacian_blend else 'linear' |
|
for p, m, mask in zip(split_preds[frame_idx], split_matrs[frame_idx], split_masks[frame_idx]): |
|
p = cv2.resize(p, (512,512)) |
|
mask = cv2.resize(mask, (512,512)) if mask is not None else None |
|
m /= 0.25 |
|
whole_img = paste_to_whole(p, whole_img, m, mask=mask, crop_mask=crop_mask, blend_method=blend_method, blur_amount=blur_amount, erode_amount=erode_amount) |
|
cv2.imwrite(whole_img_path, whole_img) |
|
|
|
def concurrent_post_process(image_sequence, *args): |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [] |
|
for idx, frame_img in enumerate(image_sequence): |
|
future = executor.submit(post_process, idx, frame_img, *args) |
|
futures.append(future) |
|
|
|
for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Pasting back"): |
|
result = future.result() |
|
|
|
concurrent_post_process( |
|
image_sequence, |
|
split_preds, |
|
split_matrs, |
|
split_masks, |
|
enable_laplacian_blend, |
|
crop_mask, |
|
blur_amount, |
|
erode_amount |
|
) |
|
|
|
|
|
## |
|
|
|
if input_type == "Image": |
|
target = cv2.imread(image_path) |
|
output_file = os.path.join(output_path, output_name + ".png") |
|
cv2.imwrite(output_file, target) |
|
|
|
for info_update in swap_process([output_file]): |
|
yield info_update |
|
|
|
OUTPUT_FILE = output_file |
|
WORKSPACE = output_path |
|
PREVIEW = cv2.imread(output_file)[:, :, ::-1] |
|
|
|
yield get_finsh_text(start_time), *ui_after() |
|
|
|
## |
|
|
|
elif input_type == "Video": |
|
temp_path = os.path.join(output_path, output_name, "sequence") |
|
os.makedirs(temp_path, exist_ok=True) |
|
|
|
yield "### \n β Extracting video frames...", *ui_before() |
|
image_sequence = [] |
|
cap = cv2.VideoCapture(video_path) |
|
curr_idx = 0 |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret:break |
|
frame_path = os.path.join(temp_path, f"frame_{curr_idx}.jpg") |
|
cv2.imwrite(frame_path, frame) |
|
image_sequence.append(frame_path) |
|
curr_idx += 1 |
|
cap.release() |
|
cv2.destroyAllWindows() |
|
|
|
for info_update in swap_process(image_sequence): |
|
yield info_update |
|
|
|
yield "### \n β Merging sequence...", *ui_before() |
|
output_video_path = os.path.join(output_path, output_name + ".mp4") |
|
merge_img_sequence_from_ref(video_path, image_sequence, output_video_path) |
|
|
|
if os.path.exists(temp_path) and not keep_output_sequence: |
|
yield "### \n β Removing temporary files...", *ui_before() |
|
shutil.rmtree(temp_path) |
|
|
|
WORKSPACE = output_path |
|
OUTPUT_FILE = output_video_path |
|
|
|
yield get_finsh_text(start_time), *ui_after_vid() |
|
|
|
## |
|
|
|
elif input_type == "Directory": |
|
extensions = ["jpg", "jpeg", "png", "bmp", "tiff", "ico", "webp"] |
|
temp_path = os.path.join(output_path, output_name) |
|
if os.path.exists(temp_path): |
|
shutil.rmtree(temp_path) |
|
os.mkdir(temp_path) |
|
|
|
file_paths =[] |
|
for file_path in glob.glob(os.path.join(directory_path, "*")): |
|
if any(file_path.lower().endswith(ext) for ext in extensions): |
|
img = cv2.imread(file_path) |
|
new_file_path = os.path.join(temp_path, os.path.basename(file_path)) |
|
cv2.imwrite(new_file_path, img) |
|
file_paths.append(new_file_path) |
|
|
|
for info_update in swap_process(file_paths): |
|
yield info_update |
|
|
|
PREVIEW = cv2.imread(file_paths[-1])[:, :, ::-1] |
|
WORKSPACE = temp_path |
|
OUTPUT_FILE = file_paths[-1] |
|
|
|
yield get_finsh_text(start_time), *ui_after() |
|
|
|
## |
|
|
|
elif input_type == "Stream": |
|
pass |
|
|
|
|
|
## |
|
|
|
|
|
def update_radio(value): |
|
if value == "Image": |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
) |
|
elif value == "Video": |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
) |
|
elif value == "Directory": |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
) |
|
elif value == "Stream": |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
) |
|
|
|
|
|
def swap_option_changed(value): |
|
if value.startswith("Age"): |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
) |
|
elif value == "Specific Face": |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
) |
|
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) |
|
|
|
|
|
def video_changed(video_path): |
|
sliders_update = gr.Slider.update |
|
button_update = gr.Button.update |
|
number_update = gr.Number.update |
|
|
|
if video_path is None: |
|
return ( |
|
sliders_update(minimum=0, maximum=0, value=0), |
|
sliders_update(minimum=1, maximum=1, value=1), |
|
number_update(value=1), |
|
) |
|
try: |
|
clip = VideoFileClip(video_path) |
|
fps = clip.fps |
|
total_frames = clip.reader.nframes |
|
clip.close() |
|
return ( |
|
sliders_update(minimum=0, maximum=total_frames, value=0, interactive=True), |
|
sliders_update( |
|
minimum=0, maximum=total_frames, value=total_frames, interactive=True |
|
), |
|
number_update(value=fps), |
|
) |
|
except: |
|
return ( |
|
sliders_update(value=0), |
|
sliders_update(value=0), |
|
number_update(value=1), |
|
) |
|
|
|
|
|
def analyse_settings_changed(detect_condition, detection_size, detection_threshold): |
|
yield "### \n β Applying new values..." |
|
global FACE_ANALYSER |
|
global DETECT_CONDITION |
|
DETECT_CONDITION = detect_condition |
|
FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER) |
|
FACE_ANALYSER.prepare( |
|
ctx_id=0, |
|
det_size=(int(detection_size), int(detection_size)), |
|
det_thresh=float(detection_threshold), |
|
) |
|
yield f"### \n βοΈ Applied detect condition:{detect_condition}, detection size: {detection_size}, detection threshold: {detection_threshold}" |
|
|
|
|
|
def stop_running(): |
|
global STREAMER |
|
if hasattr(STREAMER, "stop"): |
|
STREAMER.stop() |
|
STREAMER = None |
|
return "Cancelled" |
|
|
|
|
|
def slider_changed(show_frame, video_path, frame_index): |
|
if not show_frame: |
|
return None, None |
|
if video_path is None: |
|
return None, None |
|
clip = VideoFileClip(video_path) |
|
frame = clip.get_frame(frame_index / clip.fps) |
|
frame_array = np.array(frame) |
|
clip.close() |
|
return gr.Image.update(value=frame_array, visible=True), gr.Video.update( |
|
visible=False |
|
) |
|
|
|
|
|
def trim_and_reload(video_path, output_path, output_name, start_frame, stop_frame): |
|
yield video_path, f"### \n β Trimming video frame {start_frame} to {stop_frame}..." |
|
try: |
|
output_path = os.path.join(output_path, output_name) |
|
trimmed_video = trim_video(video_path, output_path, start_frame, stop_frame) |
|
yield trimmed_video, "### \n βοΈ Video trimmed and reloaded." |
|
except Exception as e: |
|
print(e) |
|
yield video_path, "### \n β Video trimming failed. See console for more info." |
|
|
|
|
|
## |
|
|
|
css = """ |
|
footer{display:none !important} |
|
""" |
|
|
|
with gr.Blocks(css=css) as interface: |
|
gr.Markdown("# πΏ Swap Mukham") |
|
gr.Markdown("### Face swap app based on insightface inswapper.") |
|
with gr.Row(): |
|
with gr.Row(): |
|
with gr.Column(scale=0.4): |
|
with gr.Tab("π Swap Condition"): |
|
swap_option = gr.Dropdown( |
|
swap_options_list, |
|
info="Choose which face or faces in the target image to swap.", |
|
multiselect=False, |
|
show_label=False, |
|
value=swap_options_list[0], |
|
interactive=True, |
|
) |
|
age = gr.Number( |
|
value=25, label="Value", interactive=True, visible=False |
|
) |
|
|
|
with gr.Tab("ποΈ Detection Settings"): |
|
detect_condition_dropdown = gr.Dropdown( |
|
detect_conditions, |
|
label="Condition", |
|
value=DETECT_CONDITION, |
|
interactive=True, |
|
info="This condition is only used when multiple faces are detected on source or specific image.", |
|
) |
|
detection_size = gr.Number( |
|
label="Detection Size", value=DETECT_SIZE, interactive=True |
|
) |
|
detection_threshold = gr.Number( |
|
label="Detection Threshold", |
|
value=DETECT_THRESH, |
|
interactive=True, |
|
) |
|
apply_detection_settings = gr.Button("Apply settings") |
|
|
|
with gr.Tab("π€ Output Settings"): |
|
output_directory = gr.Text( |
|
label="Output Directory", |
|
value=DEF_OUTPUT_PATH, |
|
interactive=True, |
|
) |
|
output_name = gr.Text( |
|
label="Output Name", value="Result", interactive=True |
|
) |
|
keep_output_sequence = gr.Checkbox( |
|
label="Keep output sequence", value=False, interactive=True |
|
) |
|
|
|
with gr.Tab("πͺ Other Settings"): |
|
face_scale = gr.Slider( |
|
label="Face Scale", |
|
minimum=0, |
|
maximum=2, |
|
value=1, |
|
interactive=True, |
|
) |
|
|
|
face_enhancer_name = gr.Dropdown( |
|
FACE_ENHANCER_LIST, label="Face Enhancer", value="NONE", multiselect=False, interactive=True |
|
) |
|
|
|
with gr.Accordion("Advanced Mask", open=False): |
|
enable_face_parser_mask = gr.Checkbox( |
|
label="Enable Face Parsing", |
|
value=False, |
|
interactive=True, |
|
) |
|
|
|
mask_include = gr.Dropdown( |
|
mask_regions.keys(), |
|
value=MASK_INCLUDE, |
|
multiselect=True, |
|
label="Include", |
|
interactive=True, |
|
) |
|
mask_soft_kernel = gr.Number( |
|
label="Soft Erode Kernel", |
|
value=MASK_SOFT_KERNEL, |
|
minimum=3, |
|
interactive=True, |
|
visible = False |
|
) |
|
mask_soft_iterations = gr.Number( |
|
label="Soft Erode Iterations", |
|
value=MASK_SOFT_ITERATIONS, |
|
minimum=0, |
|
interactive=True, |
|
|
|
) |
|
|
|
|
|
with gr.Accordion("Crop Mask", open=False): |
|
crop_top = gr.Slider(label="Top", minimum=0, maximum=511, value=0, step=1, interactive=True) |
|
crop_bott = gr.Slider(label="Bottom", minimum=0, maximum=511, value=511, step=1, interactive=True) |
|
crop_left = gr.Slider(label="Left", minimum=0, maximum=511, value=0, step=1, interactive=True) |
|
crop_right = gr.Slider(label="Right", minimum=0, maximum=511, value=511, step=1, interactive=True) |
|
|
|
|
|
erode_amount = gr.Slider( |
|
label="Mask Erode", |
|
minimum=0, |
|
maximum=1, |
|
value=MASK_ERODE_AMOUNT, |
|
step=0.05, |
|
interactive=True, |
|
) |
|
|
|
blur_amount = gr.Slider( |
|
label="Mask Blur", |
|
minimum=0, |
|
maximum=1, |
|
value=MASK_BLUR_AMOUNT, |
|
step=0.05, |
|
interactive=True, |
|
) |
|
|
|
enable_laplacian_blend = gr.Checkbox( |
|
label="Laplacian Blending", |
|
value=True, |
|
interactive=True, |
|
) |
|
|
|
|
|
source_image_input = gr.Image( |
|
label="Source face", type="filepath", interactive=True |
|
) |
|
|
|
with gr.Group(visible=False) as specific_face: |
|
for i in range(NUM_OF_SRC_SPECIFIC): |
|
idx = i + 1 |
|
code = "\n" |
|
code += f"with gr.Tab(label='({idx})'):" |
|
code += "\n\twith gr.Row():" |
|
code += f"\n\t\tsrc{idx} = gr.Image(interactive=True, type='numpy', label='Source Face {idx}')" |
|
code += f"\n\t\ttrg{idx} = gr.Image(interactive=True, type='numpy', label='Specific Face {idx}')" |
|
exec(code) |
|
|
|
distance_slider = gr.Slider( |
|
minimum=0, |
|
maximum=2, |
|
value=0.6, |
|
interactive=True, |
|
label="Distance", |
|
info="Lower distance is more similar and higher distance is less similar to the target face.", |
|
) |
|
|
|
with gr.Group(): |
|
input_type = gr.Radio( |
|
["Image", "Video"], |
|
label="Target Type", |
|
value="Image", |
|
) |
|
|
|
with gr.Group(visible=True) as input_image_group: |
|
image_input = gr.Image( |
|
label="Target Image", interactive=True, type="filepath" |
|
) |
|
|
|
with gr.Group(visible=False) as input_video_group: |
|
vid_widget = gr.Video if USE_COLAB else gr.Text |
|
video_input = gr.Video( |
|
label="Target Video", interactive=True |
|
) |
|
with gr.Accordion("βοΈ Trim video", open=False): |
|
with gr.Column(): |
|
with gr.Row(): |
|
set_slider_range_btn = gr.Button( |
|
"Set frame range", interactive=True |
|
) |
|
show_trim_preview_btn = gr.Checkbox( |
|
label="Show frame when slider change", |
|
value=True, |
|
interactive=True, |
|
) |
|
|
|
video_fps = gr.Number( |
|
value=30, |
|
interactive=False, |
|
label="Fps", |
|
visible=False, |
|
) |
|
start_frame = gr.Slider( |
|
minimum=0, |
|
maximum=1, |
|
value=0, |
|
step=1, |
|
interactive=True, |
|
label="Start Frame", |
|
info="", |
|
) |
|
end_frame = gr.Slider( |
|
minimum=0, |
|
maximum=1, |
|
value=1, |
|
step=1, |
|
interactive=True, |
|
label="End Frame", |
|
info="", |
|
) |
|
trim_and_reload_btn = gr.Button( |
|
"Trim and Reload", interactive=True |
|
) |
|
|
|
with gr.Group(visible=False) as input_directory_group: |
|
direc_input = gr.Text(label="Path", interactive=True) |
|
|
|
with gr.Column(scale=0.6): |
|
info = gr.Markdown(value="...") |
|
|
|
with gr.Row(): |
|
swap_button = gr.Button("β¨ Swap", variant="primary") |
|
cancel_button = gr.Button("β Cancel") |
|
|
|
preview_image = gr.Image(label="Output", interactive=False) |
|
preview_video = gr.Video( |
|
label="Output", interactive=False, visible=False |
|
) |
|
|
|
with gr.Row(): |
|
output_directory_button = gr.Button( |
|
"π", interactive=False, visible=False |
|
) |
|
output_video_button = gr.Button( |
|
"π¬", interactive=False, visible=False |
|
) |
|
|
|
with gr.Group(): |
|
with gr.Row(): |
|
gr.Markdown( |
|
"### [π€ Sponsor](https://github.com/sponsors/harisreedhar)" |
|
) |
|
gr.Markdown( |
|
"### [π¨βπ» Source code](https://github.com/harisreedhar/Swap-Mukham)" |
|
) |
|
gr.Markdown( |
|
"### [β οΈ Disclaimer](https://github.com/harisreedhar/Swap-Mukham#disclaimer)" |
|
) |
|
gr.Markdown( |
|
"### [π Run in Colab](https://colab.research.google.com/github/harisreedhar/Swap-Mukham/blob/main/swap_mukham_colab.ipynb)" |
|
) |
|
gr.Markdown( |
|
"### [π€ Acknowledgements](https://github.com/harisreedhar/Swap-Mukham#acknowledgements)" |
|
) |
|
|
|
## |
|
|
|
set_slider_range_event = set_slider_range_btn.click( |
|
video_changed, |
|
inputs=[video_input], |
|
outputs=[start_frame, end_frame, video_fps], |
|
) |
|
|
|
trim_and_reload_event = trim_and_reload_btn.click( |
|
fn=trim_and_reload, |
|
inputs=[video_input, output_directory, output_name, start_frame, end_frame], |
|
outputs=[video_input, info], |
|
) |
|
|
|
start_frame_event = start_frame.release( |
|
fn=slider_changed, |
|
inputs=[show_trim_preview_btn, video_input, start_frame], |
|
outputs=[preview_image, preview_video], |
|
show_progress=True, |
|
) |
|
|
|
end_frame_event = end_frame.release( |
|
fn=slider_changed, |
|
inputs=[show_trim_preview_btn, video_input, end_frame], |
|
outputs=[preview_image, preview_video], |
|
show_progress=True, |
|
) |
|
|
|
input_type.change( |
|
update_radio, |
|
inputs=[input_type], |
|
outputs=[input_image_group, input_video_group, input_directory_group], |
|
) |
|
swap_option.change( |
|
swap_option_changed, |
|
inputs=[swap_option], |
|
outputs=[age, specific_face, source_image_input], |
|
) |
|
|
|
apply_detection_settings.click( |
|
analyse_settings_changed, |
|
inputs=[detect_condition_dropdown, detection_size, detection_threshold], |
|
outputs=[info], |
|
) |
|
|
|
src_specific_inputs = [] |
|
gen_variable_txt = ",".join( |
|
[f"src{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] |
|
+ [f"trg{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] |
|
) |
|
exec(f"src_specific_inputs = ({gen_variable_txt})") |
|
swap_inputs = [ |
|
input_type, |
|
image_input, |
|
video_input, |
|
direc_input, |
|
source_image_input, |
|
output_directory, |
|
output_name, |
|
keep_output_sequence, |
|
swap_option, |
|
age, |
|
distance_slider, |
|
face_enhancer_name, |
|
enable_face_parser_mask, |
|
mask_include, |
|
mask_soft_kernel, |
|
mask_soft_iterations, |
|
blur_amount, |
|
erode_amount, |
|
face_scale, |
|
enable_laplacian_blend, |
|
crop_top, |
|
crop_bott, |
|
crop_left, |
|
crop_right, |
|
*src_specific_inputs, |
|
] |
|
|
|
swap_outputs = [ |
|
info, |
|
preview_image, |
|
output_directory_button, |
|
output_video_button, |
|
preview_video, |
|
] |
|
|
|
swap_event = swap_button.click( |
|
fn=process, inputs=swap_inputs, outputs=swap_outputs, show_progress=True |
|
) |
|
|
|
cancel_button.click( |
|
fn=stop_running, |
|
inputs=None, |
|
outputs=[info], |
|
cancels=[ |
|
swap_event, |
|
trim_and_reload_event, |
|
set_slider_range_event, |
|
start_frame_event, |
|
end_frame_event, |
|
], |
|
show_progress=True, |
|
) |
|
output_directory_button.click( |
|
lambda: open_directory(path=WORKSPACE), inputs=None, outputs=None |
|
) |
|
output_video_button.click( |
|
lambda: open_directory(path=OUTPUT_FILE), inputs=None, outputs=None |
|
) |
|
|
|
if __name__ == "__main__": |
|
if USE_COLAB: |
|
print("Running in colab mode") |
|
|
|
interface.launch() |
|
|
|
|
|
#### APP.PY CODE END ### |
|
|