File size: 17,050 Bytes
8eb64f2
be56d2f
f1611e7
 
 
 
be56d2f
 
f1611e7
be56d2f
 
1c18857
f1611e7
 
1c18857
be56d2f
f1611e7
be56d2f
 
 
 
1c18857
 
 
 
 
 
be56d2f
 
 
 
1c18857
8eb64f2
 
1c18857
be56d2f
 
 
 
 
 
 
 
 
 
 
 
0cffa18
 
be56d2f
8eb64f2
be56d2f
 
 
 
 
74329f2
 
 
 
 
 
 
 
be56d2f
 
 
 
 
 
 
74329f2
 
 
 
 
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
74329f2
 
 
 
 
be56d2f
74329f2
 
 
be56d2f
 
 
 
 
 
 
 
 
 
 
74329f2
 
 
 
 
be56d2f
74329f2
be56d2f
74329f2
 
be56d2f
74329f2
1c18857
f1611e7
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8eb64f2
be56d2f
 
 
8eb64f2
 
be56d2f
0cffa18
74329f2
 
be56d2f
0cffa18
 
be56d2f
 
 
 
 
 
 
f1611e7
8eb64f2
 
be56d2f
 
 
 
8eb64f2
be56d2f
8eb64f2
be56d2f
 
 
 
8eb64f2
be56d2f
8eb64f2
be56d2f
8eb64f2
be56d2f
8eb64f2
be56d2f
 
16372c8
be56d2f
 
 
 
 
 
 
 
 
 
 
 
8eb64f2
be56d2f
74329f2
be56d2f
 
 
74329f2
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
0cffa18
be56d2f
0cffa18
be56d2f
 
 
 
0cffa18
f1611e7
 
be56d2f
 
 
f1611e7
be56d2f
 
 
f1611e7
be56d2f
 
 
 
 
f1611e7
be56d2f
 
 
f1611e7
 
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f1611e7
 
be56d2f
 
 
 
 
 
 
69fbad0
 
 
 
 
 
 
 
 
 
 
 
 
da2e88a
f1611e7
 
69fbad0
f1611e7
69fbad0
 
 
 
 
0cffa18
 
 
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0cffa18
 
 
 
be56d2f
69fbad0
0cffa18
 
 
be56d2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0cffa18
 
 
 
be56d2f
69fbad0
 
 
 
be56d2f
 
 
 
 
69fbad0
 
8eb64f2
be56d2f
69fbad0
be56d2f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import json
import shutil
import logging
import tempfile
from datetime import datetime
from typing import Tuple, Optional

import numpy as np
import cv2
from PIL import Image
import gradio as gr
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from insightface.app import FaceAnalysis

import roop.globals
from roop.core import (
    start,
    decode_execution_providers,
    suggest_max_memory,
    suggest_execution_threads,
)
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Load environment variables
load_dotenv()


def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """
    Calculate the cosine similarity between two vectors.
    
    Parameters:
        a (np.ndarray): First vector.
        b (np.ndarray): Second vector.
        
    Returns:
        float: Cosine similarity.
    """
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-6)


class FaceIntegrDataset:
    """
    Handler for face integration dataset upload to Hugging Face.
    """
    def __init__(self, repo_id: str = "Arrcttacsrks/face_integrData") -> None:
        self.token = os.getenv('hf_token')
        if not self.token:
            raise ValueError("HF_TOKEN environment variable is not set")
        self.repo_id = repo_id
        self.api = HfApi()
        login(self.token)
        self.temp_dir = "temp_dataset"
        os.makedirs(self.temp_dir, exist_ok=True)

    def create_date_folder(self) -> Tuple[str, str]:
        """
        Create a folder based on the current date inside the temporary directory.
        
        Returns:
            Tuple[str, str]: The folder path and the current date string.
        """
        current_date = datetime.now().strftime("%Y-%m-%d")
        folder_path = os.path.join(self.temp_dir, current_date)
        os.makedirs(folder_path, exist_ok=True)
        return folder_path, current_date

    def save_metadata(self, source_path: str, target_path: str, output_path: str, timestamp: str) -> dict:
        """
        Create metadata dictionary for the face swap process.
        
        Parameters:
            source_path (str): Filename of the source image.
            target_path (str): Filename of the target image.
            output_path (str): Filename of the output image.
            timestamp (str): Timestamp string.
            
        Returns:
            dict: Metadata information.
        """
        metadata = {
            "timestamp": timestamp,
            "source_image": source_path,
            "target_image": target_path,
            "output_image": output_path,
            "date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return metadata

    def upload_to_hf(self, local_folder: str, date_folder: str) -> bool:
        """
        Upload a local folder to the Hugging Face dataset repository.
        
        Parameters:
            local_folder (str): The local folder path.
            date_folder (str): The subfolder in the repository.
            
        Returns:
            bool: True if upload is successful, False otherwise.
        """
        try:
            self.api.upload_folder(
                folder_path=local_folder,
                repo_id=self.repo_id,
                repo_type="dataset",
                path_in_repo=date_folder
            )
            logging.info("Successfully uploaded files to Hugging Face repository.")
            return True
        except Exception as e:
            logging.error(f"Error uploading to Hugging Face: {str(e)}")
            return False


def configure_roop_globals(source_path: str, target_path: str, output_path: str, do_face_enhancer: bool) -> None:
    """
    Configure global variables required for the face swap process.
    
    Parameters:
        source_path (str): Path to the source image.
        target_path (str): Path to the target image.
        output_path (str): Path to save the output image.
        do_face_enhancer (bool): Flag to determine if face enhancer should be used.
    """
    roop.globals.source_path = source_path
    roop.globals.target_path = target_path
    roop.globals.output_path = normalize_output_path(source_path, target_path, output_path)
    roop.globals.frame_processors = ["face_swapper", "face_enhancer"] if do_face_enhancer else ["face_swapper"]
    roop.globals.headless = True
    roop.globals.keep_fps = True
    roop.globals.keep_audio = True
    roop.globals.keep_frames = False
    roop.globals.many_faces = False
    roop.globals.video_encoder = "libx264"
    roop.globals.video_quality = 18
    roop.globals.max_memory = suggest_max_memory()
    roop.globals.execution_providers = decode_execution_providers(["cuda"])
    roop.globals.execution_threads = suggest_execution_threads()


def swap_face(source_file: np.ndarray, target_file: np.ndarray, doFaceEnhancer: bool) -> Optional[np.ndarray]:
    """
    Perform face swapping on static images.
    
    Parameters:
        source_file (np.ndarray): Source image array.
        target_file (np.ndarray): Target image array.
        doFaceEnhancer (bool): Flag to apply face enhancer.
        
    Returns:
        Optional[np.ndarray]: The output image array if successful, otherwise None.
    """
    folder_path = None
    try:
        dataset_handler = FaceIntegrDataset()
        folder_path, date_folder = dataset_handler.create_date_folder()
        timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y")
        source_path = os.path.join(folder_path, f"source_{timestamp}.jpg")
        target_path = os.path.join(folder_path, f"target_{timestamp}.jpg")
        output_path = os.path.join(folder_path, f"OutputImage{timestamp}.jpg")

        if source_file is None or target_file is None:
            raise ValueError("Source and target images are required")
            
        Image.fromarray(source_file).save(source_path)
        Image.fromarray(target_file).save(target_path)
        
        logging.info(f"Source image saved at: {source_path}")
        logging.info(f"Target image saved at: {target_path}")
        
        # Configure global parameters for roop
        configure_roop_globals(source_path, target_path, output_path, doFaceEnhancer)
        
        # Pre-check frame processors
        for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
            if not frame_processor.pre_check():
                logging.error("Pre-check failed for frame processor.")
                return None
        
        logging.info("Starting face swap process...")
        start()
        
        metadata = dataset_handler.save_metadata(
            os.path.basename(source_path),
            os.path.basename(target_path),
            os.path.basename(output_path),
            timestamp
        )
        
        metadata_path = os.path.join(folder_path, f"metadata_{timestamp}.json")
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=4)
        
        upload_success = dataset_handler.upload_to_hf(folder_path, date_folder)
        if upload_success:
            logging.info(f"Successfully uploaded files to dataset {dataset_handler.repo_id}")
        else:
            logging.error("Failed to upload files to Hugging Face dataset")
        
        if os.path.exists(roop.globals.output_path):
            output_image = Image.open(roop.globals.output_path)
            output_array = np.array(output_image)
            shutil.rmtree(folder_path, ignore_errors=True)
            return output_array
        else:
            logging.error("Output image not found")
            shutil.rmtree(folder_path, ignore_errors=True)
            return None
        
    except Exception as e:
        logging.exception(f"Error in face swap process: {str(e)}")
        if folder_path and os.path.exists(folder_path):
            shutil.rmtree(folder_path, ignore_errors=True)
        raise gr.Error(f"Face swap failed: {str(e)}")


def swap_face_frame(frame_bgr: np.ndarray, replacement_face_rgb: np.ndarray, doFaceEnhancer: bool) -> np.ndarray:
    """
    Swap face in a single video frame.
    
    Parameters:
        frame_bgr (np.ndarray): Video frame in BGR format.
        replacement_face_rgb (np.ndarray): Replacement face image in RGB format.
        doFaceEnhancer (bool): Flag to apply face enhancer.
        
    Returns:
        np.ndarray: Processed frame with face swapped (in RGB format).
    """
    # Convert BGR to RGB for processing
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    temp_dir = tempfile.mkdtemp(prefix="temp_faceswap_frame_")
    timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y")
    source_path = os.path.join(temp_dir, f"source_{timestamp}.jpg")
    target_path = os.path.join(temp_dir, f"target_{timestamp}.jpg")
    output_path = os.path.join(temp_dir, f"OutputImage_{timestamp}.jpg")
    
    try:
        Image.fromarray(frame_rgb).save(source_path)
        Image.fromarray(replacement_face_rgb).save(target_path)
        
        configure_roop_globals(source_path, target_path, output_path, doFaceEnhancer)
        
        start()
        
        if os.path.exists(roop.globals.output_path):
            swapped_img = np.array(Image.open(roop.globals.output_path))
        else:
            logging.warning("Output image not found after face swap; returning original frame.")
            swapped_img = frame_rgb
    except Exception as e:
        logging.exception(f"Error in processing frame for face swap: {str(e)}")
        swapped_img = frame_rgb
    finally:
        shutil.rmtree(temp_dir, ignore_errors=True)
    
    return swapped_img


def swap_face_video(reference_face: np.ndarray, replacement_face: np.ndarray, video_input: str,
                    similarity_threshold: float, doFaceEnhancer: bool) -> str:
    """
    Perform face swapping on a video frame-by-frame.
    
    Parameters:
        reference_face (np.ndarray): Reference face image (RGB) for face locking.
        replacement_face (np.ndarray): Replacement face image (RGB).
        video_input (str): Path to the input video file.
        similarity_threshold (float): Threshold for face similarity (0.0 - 1.0).
        doFaceEnhancer (bool): Flag to apply face enhancer.
        
    Returns:
        str: Path to the output video file.
        
    Raises:
        gr.Error: If face detection fails or video cannot be processed.
    """
    try:
        # Initialize insightface face analysis
        fa = FaceAnalysis()
        fa.prepare(ctx_id=0)
        
        # Get embedding for the reference face
        ref_detections = fa.get(reference_face)
        if not ref_detections:
            raise gr.Error("No face detected in the reference image!")
        ref_embedding = ref_detections[0].embedding
        
        # Open video input
        cap = cv2.VideoCapture(video_input)
        if not cap.isOpened():
            raise gr.Error("Cannot open the input video!")
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        output_video_path = "temp_faceswap_video.mp4"
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
        
        frame_index = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            detections = fa.get(frame_rgb)
            swap_this_frame = any(
                cosine_similarity(det.embedding, ref_embedding) >= similarity_threshold
                for det in detections
            )
            
            if swap_this_frame:
                swapped_frame_rgb = swap_face_frame(frame, replacement_face, doFaceEnhancer)
                swapped_frame = cv2.cvtColor(swapped_frame_rgb, cv2.COLOR_RGB2BGR)
            else:
                swapped_frame = frame
            
            out.write(swapped_frame)
            frame_index += 1
            logging.info(f"Processed frame {frame_index}")
        
        cap.release()
        out.release()
        return output_video_path
    except Exception as e:
        logging.exception(f"Error processing video: {str(e)}")
        raise gr.Error(f"Face swap video failed: {str(e)}")


def create_interface() -> gr.Blocks:
    """
    Create and return the Gradio interface for face swapping.
    
    Returns:
        gr.Blocks: The Gradio interface.
    """
    custom_css = """
    .container {
        max-width: 1200px;
        margin: auto;
        padding: 20px;
    }
    .output-image {
        min-height: 400px;
        border: 1px solid #ccc;
        border-radius: 8px;
        padding: 10px;
    }
    """
    title = "Face - Integrator"
    description = "Upload source and target images to perform face swap."
    article = """
    <div style="text-align: center; max-width: 650px; margin: 40px auto;">
        <p>This tool performs face swapping with optional enhancement.</p>
    </div>
    """
    with gr.Blocks(title=title, css=custom_css) as app:
        gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>")
        gr.Markdown(description)
        with gr.Tabs():
            with gr.TabItem("FaceSwap Image"):
                with gr.Row():
                    with gr.Column(scale=1):
                        source_image = gr.Image(
                            label="Source Image",
                            type="numpy",
                            sources=["upload"]
                        )
                    with gr.Column(scale=1):
                        target_image = gr.Image(
                            label="Target Image",
                            type="numpy",
                            sources=["upload"]
                        )
                    with gr.Column(scale=1):
                        output_image = gr.Image(
                            label="Output Image",
                            type="numpy",
                            interactive=False,
                            elem_classes="output-image"
                        )
                with gr.Row():
                    enhance_checkbox = gr.Checkbox(
                        label="Apply Face Enhancer",
                        info="Improve image quality",
                        value=False
                    )
                with gr.Row():
                    process_btn = gr.Button(
                        "Process Face Swap",
                        variant="primary",
                        size="lg"
                    )
                process_btn.click(
                    fn=swap_face,
                    inputs=[source_image, target_image, enhance_checkbox],
                    outputs=output_image,
                    api_name="swap_face"
                )
            with gr.TabItem("FaceSwap Video"):
                gr.Markdown("<h2 style='text-align:center;'>FaceSwap Video</h2>")
                with gr.Row():
                    ref_image = gr.Image(
                        label="Reference Face Image (Lock Face)",
                        type="numpy",
                        sources=["upload"]
                    )
                    swap_image = gr.Image(
                        label="Replacement Face Image",
                        type="numpy",
                        sources=["upload"]
                    )
                video_input = gr.Video(
                    label="Input Video"
                )
                similarity_threshold = gr.Slider(
                    minimum=0.0,
                    maximum=1.0,
                    step=0.01,
                    value=0.7,
                    label="Similarity Threshold"
                )
                enhance_checkbox_video = gr.Checkbox(
                    label="Apply Face Enhancer",
                    info="Optional quality enhancement",
                    value=False
                )
                process_video_btn = gr.Button(
                    "Process FaceSwap Video",
                    variant="primary",
                    size="lg"
                )
                video_output = gr.Video(
                    label="Output Video"
                )
                process_video_btn.click(
                    fn=swap_face_video,
                    inputs=[ref_image, swap_image, video_input, similarity_threshold, enhance_checkbox_video],
                    outputs=video_output,
                    api_name="swap_face_video"
                )
        gr.Markdown(article)
    return app


def main() -> None:
    """
    Launch the Gradio interface.
    """
    app = create_interface()
    app.launch(share=False)


if __name__ == "__main__":
    main()