Bassamejlaoui commited on
Commit
a1d86a2
1 Parent(s): 5d862a2

Upload Swaper.py

Browse files
Files changed (1) hide show
  1. Swaper.py +131 -0
Swaper.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from prodiapy import Prodia
3
+ from PIL import Image
4
+ from io import BytesIO
5
+ import requests
6
+ import random
7
+ import os
8
+ import base64
9
+ import json
10
+ import time
11
+
12
+
13
+ class Prodia:
14
+ def __init__(self, api_key=os.getenv("PRODIA_API_KEY"), base=None):
15
+ self.base = base or "https://api.prodia.com/v1"
16
+ self.headers = {
17
+ "X-Prodia-Key": api_key
18
+ }
19
+
20
+ def faceswap(self, params):
21
+ response = self._post(f"{self.base}/faceswap", params)
22
+ return response.json()
23
+
24
+ def get_job(self, job_id):
25
+ response = self._get(f"{self.base}/job/{job_id}")
26
+ return response.json()
27
+
28
+ def wait(self, job):
29
+ job_result = job
30
+
31
+ while job_result['status'] not in ['succeeded', 'failed']:
32
+ time.sleep(0.25)
33
+ job_result = self.get_job(job['job'])
34
+
35
+ return job_result
36
+
37
+ def _post(self, url, params):
38
+ headers = {
39
+ **self.headers,
40
+ "Content-Type": "application/json"
41
+ }
42
+ response = requests.post(url, headers=headers, data=json.dumps(params))
43
+
44
+ if response.status_code != 200:
45
+ raise Exception(f"Bad Prodia Response: {response.status_code}")
46
+
47
+ return response
48
+
49
+ def _get(self, url):
50
+ response = requests.get(url, headers=self.headers)
51
+
52
+ if response.status_code != 200:
53
+ raise Exception(f"Bad Prodia Response: {response.status_code}")
54
+
55
+ return response
56
+
57
+
58
+ client = Prodia()
59
+
60
+
61
+ def infer(source, target):
62
+ if source_image is None or target_image is None:
63
+ return
64
+
65
+ source_url = upload_image(source)
66
+ target_url = upload_image(target)
67
+
68
+ job = client.faceswap({
69
+ "sourceUrl": source_url,
70
+ "targetUrl": target_url
71
+ })
72
+ res = client.wait(job)
73
+
74
+ if res['status'] == "failed":
75
+ return
76
+
77
+ return res['imageUrl']
78
+
79
+
80
+ def upload_image(file):
81
+ files = {'file': open(file, 'rb')}
82
+ img_id = requests.post(os.getenv("IMAGE_API_1"), files=files).json()['id']
83
+
84
+ payload = {
85
+ "content": "",
86
+ "nonce": f"{random.randint(1, 10000000)}H9X42KSEJFNNH",
87
+ "replies": [],
88
+ "attachments": [img_id]
89
+ }
90
+ res = requests.post(os.getenv("IMAGE_API_2"), json=payload, headers={"x-session-token": os.getenv("SESSION_TOKEN")})
91
+
92
+ return f"{os.getenv('IMAGE_API_1')}/{img_id}/{res.json()['attachments'][0]['filename']}"
93
+
94
+
95
+ def image_to_base64(image: Image):
96
+ # Convert the image to bytes
97
+ buffered = BytesIO()
98
+ image.save(buffered, format="PNG") # You can change format to PNG if needed
99
+
100
+ # Encode the bytes to base64
101
+ img_str = base64.b64encode(buffered.getvalue())
102
+
103
+ return img_str.decode('utf-8') # Convert bytes to string
104
+
105
+
106
+ with gr.Blocks() as demo:
107
+ with gr.Column():
108
+ gr.HTML("<h1><center>Face Swap</center></h1>")
109
+
110
+ with gr.Row():
111
+ with gr.Row():
112
+ source_image = gr.Image(type="filepath", label="Source Image")
113
+ target_image = gr.Image(type="filepath", label="Target Image")
114
+ with gr.Column():
115
+ result = gr.Image()
116
+ run_button = gr.Button("Swap Face", variant="primary")
117
+
118
+ gr.Examples(
119
+ examples=[
120
+ ["example1.jpg", "example2.jpg"],
121
+ ["example3.jpg", "example4.jpg"],
122
+ ["example5.jpg", "example6.jpg"]
123
+ ],
124
+ fn=infer,
125
+ inputs=[source_image, target_image],
126
+ outputs=[result]
127
+ )
128
+
129
+ run_button.click(fn=infer, inputs=[source_image, target_image], outputs=[result])
130
+
131
+ demo.queue(max_size=20, api_open=False).launch(show_api=False, max_threads=400)