commune-ai's picture
Rename fofr-face-to-sticker to app.py
9cf24d4 verified
import gradio as gr
from requests.exceptions import ConnectTimeout
import time
import requests
import base64
global headers
global cancel_url
global path
global output_image
global property_name_array
property_name_array =[]
output_image = ''
path = ''
cancel_url =''
headers = {
'Content-Type': 'application/json',
'Authorization': 'Token r8_ZGZlzThfRkPZVDMygVclY1XZ9AuxmIQ2qwwPP',
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Origin": '**',
"Access-Control-Allow-Methods": "OPTIONS,POST,GET,PATCH"}
with gr.Blocks() as demo:
owner = "fofr"
name = "face-to-sticker"
max_retries = 3
retry_delay = 2
for retry in range(max_retries):
try:
url = f'https://api.replicate.com/v1/models/{owner}/{name}'
response = requests.get(url, headers=headers, timeout=10)
# Process the response
break # Break out of the loop if the request is successful
except ConnectTimeout:
if retry < max_retries - 1:
print(f"Connection timed out. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print("Max retries exceeded. Unable to establish connection.")
data = response.json()
description =data.get("description", '')
title = data.get("default_example",'').get("model",'')
version = data.get("default_example",'').get("version",'')
gr.Markdown(
f"""
# {title}
{description}
""")
with gr.Row():
with gr.Column():
inputs =[]
schema = data.get("latest_version", {}).get("openapi_schema", {}).get("components", {}).get("schemas", {})
ordered_properties = sorted(schema.get("Input", {}).get("properties", {}).items(), key=lambda x: x[1].get("x-order", 0))
required = schema.get("Input", '').get('required', [])
print(required,"required")
for property_name, property_info in ordered_properties :
property_name_array.append(property_name)
if required:
for item in required:
if item == property_name:
label = "*"+ property_info.get('title', '')
description = property_info.get('description','')
break
else:
label = property_info.get('title', '')
description = property_info.get('description','')
else:
label = property_info.get('title', '')
description = property_info.get('description','')
if "x-order" in property_info:
order = int(property_info.get('x-order',''))
if property_info.get("type", {}) == "integer":
value= data.get('default_example', '').get('input','').get(property_name,0)
if "minimum" and "maximum" in property_info:
if value == 0:
inputs.insert(order, gr.Slider(label=label, info= description, value=property_info.get('default', value), minimum=property_info.get('minimum', ''), maximum=property_info.get('maximum', ''), step=1))
else:
inputs.insert(order, gr.Slider(label=label, info= description, value=value, minimum=property_info.get('minimum', ''), maximum=property_info.get('maximum', ''), step=1))
else:
if value == 0:
inputs.insert(order, gr.Number(label=label, info= description, value=property_info.get('default', value)))
else:
inputs.insert(order, gr.Number(label=label, info= description, value=value))
elif property_info.get("type", {}) == "string":
value= data.get('default_example', '').get('input','').get(property_name,'')
if property_info.get('format','') == 'uri':
if value :
inputs.insert(order, gr.Image(label=label, value=value, type="filepath"))
else :
inputs.insert(order, gr.Image(label=label, type="filepath"))
else:
if value == '':
inputs.insert(order, gr.Textbox(label=label,info= description, value=property_info.get('default', value)))
else:
inputs.insert(order, gr.Textbox(label=label,info= description, value=value))
elif property_info.get("type", {}) == "number":
value= data.get('default_example', '').get('input','').get(property_name, 0)
if "minimum" and "maximum" in property_info:
if value == 0:
inputs.insert(order, gr.Slider(label=label,info= description, value=property_info.get('default', value), minimum=property_info.get('minimum', ''), maximum=property_info.get('maximum', '')))
else:
inputs.insert(order, gr.Slider(label=label,info= description, value=value, minimum=property_info.get('minimum', ''), maximum=property_info.get('maximum', '')))
else:
if value == 0:
inputs.insert(order, gr.Number(label=label,info= description, value=property_info.get('default', value)))
else:
inputs.insert(order, gr.Number(label=label,info= description, value=value))
elif property_info.get("type", {}) == "boolean":
value= data.get('default_example', '').get('input','').get(property_name,'')
if value == '':
inputs.insert(order, gr.Checkbox(label=label,info= description, value=property_info.get('default', value)))
else:
inputs.insert(order, gr.Checkbox(label=label,info= description, value=value))
else:
value= data.get('default_example', '').get('input','').get(property_name,'')
options=schema.get(property_name,'').get('enum',[])
if value:
inputs.insert(order, gr.Dropdown(label=property_name,info= description,choices=options, value=property_info.get("default", value)))
else:
inputs.insert(order, gr.Dropdown(label=property_name,info= description,choices=options, value=value))
with gr.Row():
cancel_btn = gr.Button("Cancel")
run_btn = gr.Button("Run")
with gr.Column():
outputs = []
output_result = data.get("default_example", '').get("output")
output_type= schema.get("Output", '').get("type", '')
if output_type == 'array':
output_image = output_result[0]
else:
output_image = output_result
print (output_image)
outputs.append(gr.Image(value=output_image))
outputs.append(gr.Image(visible=False))
outputs.append(gr.Image(visible=False))
outputs.append(gr.Image(visible=False))
def run_process(input1, input2, input3, input4, input5, input6, input7, input8, input9,input10,input11, input12, input13):
global cancel_url
global property_name_array
print(len(property_name_array))
cancel_url=''
url = 'https://replicate.com/api/predictions'
if input1:
with open(input1, "rb") as file:
data = file.read()
base64_data = base64.b64encode(data).decode("utf-8")
mimetype = "image/jpg"
data_uri_image = f"data:{mimetype};base64,{base64_data}"
else:
data_uri_image=None
print(version, 'version')
body = {
"version": version,
"input": {
property_name_array[0]: data_uri_image,
property_name_array[1]: input2,
property_name_array[2]: input3,
property_name_array[3]: input4,
property_name_array[4]: input5,
property_name_array[5]: input6,
property_name_array[6]: input7,
property_name_array[7]: input8,
property_name_array[8]: input9,
property_name_array[9]: input10,
property_name_array[10]: input11,
property_name_array[11]: input12,
property_name_array[12]: input13,
}
}
response = requests.post(url, json=body)
print(response.status_code)
if response.status_code == 201:
response_data = response.json()
get_url = response_data.get('urls','').get('get','')
identifier = 'https://replicate.com/api/predictions/'+get_url.split("/")[-1]
print(identifier,'')
time.sleep(3)
output =verify_image(identifier)
print(output,'333')
if output:
if len(output) == 1:
return gr.Image(value=output[0]), gr.Image(),gr.Image(),gr.Image()
elif len(output) == 2:
return gr.Image(value=output[0]), gr.Image(value=output[1],visible= True),gr.Image(),gr.Image()
elif len(output) == 3:
return gr.Image(value=output[0]), gr.Image(value=output[1],visible= True),gr.Image(value=output[2],visible= True),gr.Image()
elif len(output) == 3:
return gr.Image(value=output[0]), gr.Image(value=output[1],visible= True),gr.Image(value=output[2],visible= True),gr.Image(value=output[3],visible= True)
return gr.Image(),gr.Image(visible=False),gr.Image(visible=False),gr.Image(visible=False)
def cancel_process(input1, input2, input3, input4, input5, input6, input7, input8, input9,input10,input11, input12, input13):
global cancel_url
cancel_url = '123'
global output_image
return gr.Image(value=output_image), gr.Image(visible=False),gr.Image(visible=False),gr.Image(visible=False)
def verify_image(get_url):
res = requests.get(get_url)
if res.status_code == 200:
res_data = res.json()
if res_data.get('error',''):
return
else:
if cancel_url:
return
else:
output = res_data.get('output', [])
print(output,'111')
if output:
print(output,'222')
return output
else:
time.sleep(1)
val = verify_image(get_url)
return val
else:
return []
run_btn.click(run_process, inputs=inputs, outputs=outputs, api_name="run")
cancel_btn.click(cancel_process, inputs=inputs, outputs=outputs, api_name="cancel")
demo.launch()