Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import shutil | |
import numpy as np | |
from PIL import Image | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from gradio_client import Client | |
from fastapi import FastAPI, Request | |
from fastapi.staticfiles import StaticFiles | |
from starlette.responses import Response | |
import uvicorn | |
import requests | |
from urllib.parse import quote | |
from unicodedata import normalize | |
import json | |
import time | |
from html.parser import HTMLParser | |
try: | |
import cv2 | |
except ImportError: | |
os.system('pip install opencv-python') | |
import cv2 | |
root = os.path.dirname(os.path.abspath(__file__)) | |
textures_folder = os.path.join(root, 'textures') | |
os.makedirs(textures_folder, exist_ok=True) | |
valid_extensions = ['.jpeg', '.jpg', '.png'] | |
textures_repo = "https://huggingface.co/datasets/2ch/textures/resolve/main/" | |
textures_for_download = [ | |
f"{textures_repo}гауссовский_шум_и_мелкое_зерно.png?download=true", | |
f"{textures_repo}грязная_матрица.png?download=true", | |
f"{textures_repo}для_ночных_и_тёмных_кадров_сильный_шум_и_пыль.png?download=true", | |
f"{textures_repo}для_ночных_и_тёмных_кадров_царапины_шум_пыль_дымка.png?download=true", | |
f"{textures_repo}для_светлых_и_солнечных_ярких_фото_мелкое_констрастное_зерно.png?download=true", | |
f"{textures_repo}зернистость_плёнки.png?download=true", | |
f"{textures_repo}зернистость_плёнки_с_грязью.png?download=true", | |
f"{textures_repo}испорченная_ворсом_плёнка.png?download=true", | |
f"{textures_repo}мелкий_цветной_шум.png?download=true", | |
f"{textures_repo}мелкое_контрастное_зерно_и_средний_цветвой_шум.png?download=true", | |
f"{textures_repo}очень_мелкое_зерно.png?download=true", | |
f"{textures_repo}пыльная_плёнка.png?download=true", | |
f"{textures_repo}сильный_цветовой_шум_для_ночных_фото.png?download=true", | |
f"{textures_repo}слабый_естественный_шум_матрицы_смартфона.png?download=true", | |
f"{textures_repo}среднее_зерно.png?download=true", | |
f"{textures_repo}среднее_монохромное_зерно_пыль_и_ворсинки.png?download=true", | |
f"{textures_repo}средний_цветной_шум.png?download=true", | |
f"{textures_repo}старая_матрица.png?download=true", | |
f"{textures_repo}старая_потёртая_плёнка.png?download=true", | |
f"{textures_repo}цветной_шум_матрицы.png?download=true", | |
f"{textures_repo}цветной_шум_на_плёнке.png?download=true", | |
f"{textures_repo}шумная_матрица.png?download=true", | |
] | |
def dl_textures(texture_url: str) -> None: | |
texture_for_download = quote(normalize('NFD', texture_url), safe='/?:=') | |
filename = texture_url.split('/')[-1].split('?')[0] | |
file_path = os.path.join(textures_folder, filename) | |
response = requests.get(texture_for_download, stream=True) | |
response.raise_for_status() | |
with open(file_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
def create_texture_preview(texture_folder: str, output_folder: str, size: tuple[int] = (246, 246)) -> None: | |
os.makedirs(output_folder, exist_ok=True) | |
for texture in os.listdir(texture_folder): | |
img_path = os.path.join(texture_folder, texture) | |
img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED) | |
start_x = np.random.randint(0, img.shape[1] - size[1]) | |
start_y = np.random.randint(0, img.shape[0] - size[0]) | |
img = img[start_y:start_y + size[0], start_x:start_x + size[1]] | |
cv2.imwrite(os.path.join(output_folder, texture), img) | |
def prepare_textures(texture_folder: str, output_folder: str) -> None: | |
with ThreadPoolExecutor(max_workers=len(textures_for_download)) as executor: | |
futures = [executor.submit(dl_textures, texture_for_download) for texture_for_download in | |
textures_for_download] | |
for future in as_completed(futures): | |
future.result() | |
create_texture_preview(texture_folder, output_folder, size=(246, 246)) | |
preview_css = "" | |
prepare_textures(textures_folder, os.path.join(root, 'preview')) | |
for i, texture in enumerate(os.listdir(textures_folder), start=1): | |
if os.path.splitext(texture)[1].lower() in valid_extensions: | |
preview_css += f"""[data-testid="{i:02d}-radio-label"]::before {{ | |
background-color: transparent !important; | |
background-image: url("./preview/{texture}") !important; | |
}}\n""" | |
radio_css = """ | |
html, | |
body { | |
background: var(--body-background-fill); | |
} | |
.gradio-container { | |
max-width: 1396px !important; | |
} | |
#textures label { | |
position: relative; | |
width: 256px; | |
height: 256px; | |
display: flex; | |
flex-direction: row; | |
align-items: flex-end; | |
background: none !important; | |
padding: 4px !important; | |
transition: .3s; | |
} | |
#textures label::before { | |
width: 246px; | |
height: 246px; | |
border-radius: 8px; | |
display: block; | |
content: ""; | |
transition: .3s; | |
background: red; | |
position: relative; | |
top: 0px; | |
} | |
#textures label:hover::before, | |
#textures label:active::before, | |
#textures label.selected::before { | |
mix-blend-mode: soft-light; | |
transition: .3s | |
} | |
#textures span:not([data-testid="block-info"]), | |
#textures input { | |
position: absolute; | |
z-index: 999; | |
} | |
#textures input { | |
position: absolute; | |
z-index: 999; | |
bottom: 9px; | |
left: 9px; | |
} | |
#textures span:not([data-testid="block-info"]) { | |
left: 21px; | |
padding: 2px 8px; | |
background: rgba(0, 0, 0, .57); | |
backdrop-filter: blur(3px) | |
} | |
#textures { | |
background-color: hsla(0, 0%, 50%, 1); | |
} | |
.built-with, | |
.show-api, | |
footer .svelte-mpyp5e { | |
display: none !important; | |
} | |
footer:after { | |
content: "ну пролапс, ну и что?"; | |
} | |
#zoom { | |
position: absolute; | |
top: 50%; | |
left: 50%; | |
width: 250px; | |
height: 250px; | |
background-repeat: no-repeat; | |
box-shadow: 0px 0px 10px 5px rgba(0, 0, 0, .2); | |
border-radius: 50%; | |
cursor: none; | |
pointer-events: none; | |
z-index: 999; | |
opacity: 0; | |
transform: scale(0); | |
transition: opacity 500ms, transform 500ms; | |
} | |
#textures_tab .image-button { | |
cursor: none; | |
} | |
#textured_result-download-link, | |
#restored_image-download-link, | |
#upscaled_image-download-link { | |
position: absolute; | |
z-index: 9999; | |
padding: 2px 4px; | |
margin: 0 7px; | |
background: black; | |
bottom: 0; | |
right: 0; | |
font-size: 20px; | |
transition: 300ms | |
} | |
#textured_result-download-link:hover, | |
#restored_image-download-link:hover, | |
#upscaled_image-download-link:hover { | |
color: #99f7a8 | |
} | |
#restored_images.disabled, | |
#upscaled_images.disabled { | |
height: 0px !important; | |
opacity: 0; | |
transition: 300ms | |
} | |
#restored_images.enabled, | |
#upscaled_images.enabled { | |
transition: 300ms | |
} | |
""" + preview_css | |
custom_js = """ | |
const PageLoadObserver = new MutationObserver((mutationsList, observer) => { | |
for (let mutation of mutationsList) { | |
if (mutation.type === 'childList') { | |
const tabsDiv = document.querySelector('div.tab-nav'); | |
if (tabsDiv) { | |
observer.disconnect(); | |
document.querySelector('#textures_tab-button').addEventListener('click', () => { | |
setTimeout(() => { | |
let labels = document.querySelectorAll('label[data-testid]'); | |
labels.forEach((label) => { | |
let input = label.querySelector('input[type="radio"]'); | |
if (input) { | |
let title = input.value.split('.')[0].replace(/_/g, ' '); | |
label.title = title; | |
} | |
}); | |
document.querySelector("label[data-testid='05-radio-label']").click() | |
}, 150); | |
}) | |
function checkImagesAndSetClass(galleryElement) { | |
const firstDiv = galleryElement.querySelector('div:first-child'); | |
const hasChildElements = firstDiv && firstDiv.children.length > 0; | |
const hasImages = galleryElement.querySelectorAll('img').length > 0; | |
if (hasChildElements || hasImages) { | |
galleryElement.classList.add('enabled'); | |
galleryElement.classList.remove('disabled'); | |
} else { | |
galleryElement.classList.add('disabled'); | |
galleryElement.classList.remove('enabled'); | |
} | |
} | |
function setupGalleryObserver(galleryId) { | |
let gallery = document.getElementById(galleryId); | |
const observer = new MutationObserver(() => { | |
checkImagesAndSetClass(gallery); | |
}); | |
observer.observe(gallery, { childList: true, subtree: true }); | |
checkImagesAndSetClass(gallery); | |
} | |
setupGalleryObserver('restored_images'); | |
setupGalleryObserver('upscaled_images'); | |
function magnify(imgID, zoom) { | |
var img, glass, w, h, bw; | |
img = document.querySelector(imgID); | |
glass = document.createElement("DIV"); | |
glass.setAttribute("id", "zoom"); | |
img.parentElement.insertBefore(glass, img); | |
glass.style.backgroundImage = "url('" + img.src + "')"; | |
glass.style.backgroundRepeat = "no-repeat"; | |
glass.style.backgroundSize = (img.width * zoom) + "px " + (img.height * zoom) + "px"; | |
bw = 3; | |
w = glass.offsetWidth / 2; | |
h = glass.offsetHeight / 2; | |
glass.addEventListener("mousemove", moveMagnifier); | |
img.addEventListener("mousemove", moveMagnifier); | |
glass.addEventListener("touchmove", moveMagnifier); | |
img.addEventListener("touchmove", moveMagnifier); | |
function moveMagnifier(e) { | |
var pos, x, y; | |
e.preventDefault(); | |
pos = getCursorPos(e); | |
x = pos.x; | |
y = pos.y; | |
if (x > img.width - (w / zoom)) { x = img.width - (w / zoom); } | |
if (x < w / zoom) { x = w / zoom; } | |
if (y > img.height - (h / zoom)) { y = img.height - (h / zoom); } | |
if (y < h / zoom) { y = h / zoom; } | |
glass.style.left = (x - w) + "px"; | |
glass.style.top = (y - h) + "px"; | |
glass.style.backgroundPosition = "-" + ((x * zoom) - w + bw) + "px -" + ((y * zoom) - h) + "px"; | |
glass.style.backgroundImage = "url('" + img.src + "')"; | |
} | |
function getCursorPos(e) { | |
var a, x = 0, y = 0; | |
e = e || window.event; | |
a = img.getBoundingClientRect(); | |
x = e.pageX - a.left; | |
y = e.pageY - a.top; | |
x = x - window.scrollX; | |
y = y - window.scrollY; | |
return { x: x, y: y }; | |
} | |
img.addEventListener("mouseover", function () { | |
glass.style.opacity = "1"; | |
glass.style.transform = "scale(1)"; | |
}); | |
img.addEventListener("mouseout", function () { | |
glass.style.opacity = "0"; | |
glass.style.transform = "scale(0)"; | |
}); | |
} | |
function setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage) { | |
const imgElement = document.querySelector(imgSelector); | |
if (imgElement && imgElement.src) { | |
let downloadLink = document.querySelector(linkSelector); | |
if (!downloadLink) { | |
if (magnifyImage) { | |
magnify(magnifyImage, 3); | |
} | |
downloadLink = document.createElement('a'); | |
downloadLink.id = linkId; | |
downloadLink.innerText = 'скачать'; | |
imgElement.after(downloadLink); | |
} | |
downloadLink.href = imgElement.src; | |
downloadLink.download = ''; | |
} | |
} | |
const DownloadLinkObserverCallback = (mutationsList, observer, imgSelector, linkSelector, linkId, magnifyImage) => { | |
setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage); | |
}; | |
const DownloadLinkObserverOptions = { childList: true, subtree: true, attributes: true, attributeFilter: ['src'] }; | |
const ImageTexturedObserver = new MutationObserver((mutationsList, observer) => { | |
DownloadLinkObserverCallback(mutationsList, observer, '#textured_result img[data-testid="detailed-image"]', '#textured_result-download-link', 'textured_result-download-link', "#textured_result .image-button img"); | |
}); | |
ImageTexturedObserver.observe(document, DownloadLinkObserverOptions); | |
const ImageRestoredObserver = new MutationObserver((mutationsList, observer) => { | |
DownloadLinkObserverCallback(mutationsList, observer, '#restored_images img[data-testid="detailed-image"]', '#restored_image-download-link', 'restored_image-download-link'); | |
}); | |
ImageRestoredObserver.observe(document, DownloadLinkObserverOptions); | |
const ImageUpscaledObserver = new MutationObserver((mutationsList, observer) => { | |
DownloadLinkObserverCallback(mutationsList, observer, '#upscaled_images img[data-testid="detailed-image"]', '#upscaled_image-download-link', 'upscaled_image-download-link'); | |
}); | |
ImageUpscaledObserver.observe(document, DownloadLinkObserverOptions); | |
} | |
} | |
} | |
}); | |
PageLoadObserver.observe(document, { childList: true, subtree: true }); | |
""" | |
def extract_path_from_result(predict_answer: str | list[str] | tuple[str]) -> str: | |
if isinstance(predict_answer, (tuple, list)): | |
result = predict_answer[0] | |
shutil.rmtree(os.path.dirname(predict_answer[1]), ignore_errors=True) | |
else: | |
result = predict_answer | |
return result | |
def restore_face_common(img_path: str, predict_answer: str, model: str) -> None: | |
result = extract_path_from_result(predict_answer) | |
if os.path.exists(result): | |
if os.path.exists(img_path): | |
os.unlink(img_path) | |
new_file, new_extension = os.path.splitext(result) | |
old_file, old_extension = os.path.splitext(img_path) | |
old_filename = os.path.basename(old_file) | |
new_location = os.path.join(os.path.dirname(img_path), f"{old_filename}_{model}{new_extension}") | |
shutil.move(result, new_location) | |
shutil.rmtree(os.path.dirname(result), ignore_errors=True) | |
def restore_face_gfpgan(img_path: str) -> None: | |
client = Client(src="https://xintao-gfpgan.hf.space/", verbose=False) | |
result = client.predict(img_path, "v1.4", 4, api_name="/predict") | |
restore_face_common(img_path, result, "gfpgan") | |
def restore_face_codeformer(img_path: str) -> None: | |
client = Client(src="https://sczhou-codeformer.hf.space/", verbose=False) | |
result = client.predict(img_path, True, True, True, 2, 0, api_name="/predict") | |
restore_face_common(img_path, result, "codeformer") | |
async def restore_faces_one_image(img_path: str, func_list: list) -> bool: | |
def run_func(func) -> bool: | |
for _ in range(3): | |
try: | |
func(img_path) | |
return True | |
except Exception as e: | |
print(f"ошибка в {func.__name__}: {e}") | |
return False | |
loop = asyncio.get_event_loop() | |
with ThreadPoolExecutor(max_workers=len(func_list)) as executor: | |
futures = [loop.run_in_executor(executor, run_func, func) for func in func_list] | |
results = await asyncio.gather(*futures) | |
return any(results) | |
async def restore_faces_batch(input_images: list[str], func_list: list, batch_size: int = 3) -> bool: | |
results = False | |
try: | |
batches = [input_images[i:i + batch_size] for i in range(0, len(input_images), batch_size)] | |
for batch in batches: | |
tasks = [restore_faces_one_image(img_path, func_list) for img_path in batch] | |
results = await asyncio.gather(*tasks) | |
return any(results) | |
except Exception as error: | |
print(error) | |
return results | |
def get_file_paths(input_path: str | list[str], extensions_list: list[str]) -> list[str]: | |
files = [] | |
def add_files_from_directory(directory): | |
for file_name in os.listdir(directory): | |
if os.path.splitext(file_name)[1] in extensions_list: | |
files.append(os.path.abspath(os.path.join(directory, file_name))) | |
if isinstance(input_path, list): | |
for file_path in input_path: | |
parent_directory = os.path.dirname(file_path) | |
add_files_from_directory(parent_directory) | |
else: | |
add_files_from_directory(input_path) | |
return files | |
async def restore_upscale(files: tuple, restore_method: str) -> list[str]: | |
file_paths = [file.name for file in files] | |
if restore_method == 'codeformer': | |
func_list = [restore_face_codeformer] | |
elif restore_method == 'gfpgan': | |
func_list = [restore_face_gfpgan] | |
else: | |
func_list = [restore_face_codeformer, restore_face_gfpgan] | |
results = await restore_faces_batch(file_paths, func_list, batch_size=3) | |
if results: | |
file_paths = get_file_paths(file_paths, valid_extensions) | |
return file_paths | |
else: | |
return ['https://iili.io/JzrxjDP.png'] | |
def image_noise_softlight_layer_mix(img, texture, output: str = None, opacity: float = 0.7): | |
if isinstance(img, Image.Image): | |
img = np.array(img).astype(float) | |
elif isinstance(img, np.ndarray): | |
img = img.astype(float) | |
if img.shape[2] == 3 and not isinstance(img, Image.Image): | |
img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2BGR).astype(float) | |
overlay = cv2.imread(texture, cv2.IMREAD_UNCHANGED).astype(float) | |
start_x = np.random.randint(0, overlay.shape[1] - img.shape[1]) | |
start_y = np.random.randint(0, overlay.shape[0] - img.shape[0]) | |
overlay = overlay[start_y:start_y + img.shape[0], start_x:start_x + img.shape[1]] | |
if img.shape[2] == 3: | |
img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) | |
if overlay.shape[2] == 3: | |
overlay = cv2.cvtColor(overlay.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) | |
overlay[..., 3] *= opacity | |
img_in_norm = img / 255.0 | |
img_layer_norm = overlay / 255.0 | |
comp_alpha = np.minimum(img_in_norm[:, :, 3], img_layer_norm[:, :, 3]) * 1.0 | |
new_alpha = img_in_norm[:, :, 3] + (1.0 - img_in_norm[:, :, 3]) * comp_alpha | |
np.seterr(divide='ignore', invalid='ignore') | |
ratio = comp_alpha / new_alpha | |
ratio[ratio == np.NAN] = 0.0 | |
comp = (1.0 - img_in_norm[:, :, :3]) * img_in_norm[:, :, :3] * img_layer_norm[:, :, :3] + img_in_norm[:, :, :3] * ( | |
1.0 - (1.0 - img_in_norm[:, :, :3]) * (1.0 - img_layer_norm[:, :, :3])) | |
ratio_rs = np.reshape(np.repeat(ratio, 3), [comp.shape[0], comp.shape[1], comp.shape[2]]) | |
img_out = comp * ratio_rs + img_in_norm[:, :, :3] * (1.0 - ratio_rs) | |
img_out = np.nan_to_num(np.dstack((img_out, img_in_norm[:, :, 3]))) | |
result = img_out * 255.0 | |
rgb_image = cv2.cvtColor(result.astype(np.uint8), cv2.COLOR_BGR2RGB) | |
image = Image.fromarray(rgb_image) | |
return np.array(image) | |
def apply_texture(input_image, textures_choice: str, opacity_slider: float): | |
result = image_noise_softlight_layer_mix(input_image, os.path.join(textures_folder, textures_choice), opacity=opacity_slider) | |
return [result] | |
def temp_upload_file(file_path: str) -> str | None: | |
servers = [ | |
('https://transfer.sh/', 'fileToUpload'), | |
('https://x0.at/', 'file'), | |
('https://tmpfiles.org/api/v1/upload', 'file'), | |
('https://uguu.se/upload.php', 'files[]') | |
] | |
for i in range(3): | |
for server, file_key in servers: | |
try: | |
with open(file_path, 'rb') as f: | |
files = {file_key: f} | |
response = requests.post(server, files=files) | |
if response.status_code == 200: | |
if server == 'https://transfer.sh/': | |
return response.text.replace("https://transfer.sh/","https://transfer.sh/get/").replace("\n","") | |
elif server == 'https://tmpfiles.org/api/v1/upload': | |
response_json = response.json() | |
if response_json['status'] == 'success': | |
return response_json['data']['url'].replace("https://tmpfiles.org/", "https://tmpfiles.org/dl/") | |
elif server == 'https://uguu.se/upload.php': | |
response_json = response.json() | |
if response_json['success']: | |
return response_json['files'][0]['url'] | |
else: | |
return response.text | |
except Exception as e: | |
print(f'{server}: {e}') | |
return None | |
def upload_image(image_path: str) -> str | None: | |
files = {'source': open(image_path, "rb")} | |
data = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'} | |
response = requests.post('https://freeimage.host/api/1/upload', files=files, data=data) | |
if response.json()["status_code"] == 200: | |
return response.json()["image"]["url"] | |
else: | |
return temp_upload_file(image_path) | |
def get_headers(url: str) -> dict: | |
session = requests.Session() | |
anon_auth = session.get(url) | |
cookies = session.cookies.get_dict() | |
return { | |
'content-type': 'application/json', | |
'cookie': f'csrftoken={cookies["csrftoken"]}; replicate_anonymous_id={cookies["replicate_anonymous_id"]};', | |
'origin': 'https://replicate.com', | |
'x-csrftoken': cookies['csrftoken'], | |
'authority': 'replicate.com', | |
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/jxl,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', | |
'accept-language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7', | |
'cache-control': 'no-cache', | |
'dnt': '1', | |
'pragma': 'no-cache', | |
'referer': f'{url}?input=http', | |
'sec-ch-ua': '"Chromium";v="117", "Not;A=Brand";v="8"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"Windows"', | |
'sec-fetch-dest': 'document', | |
'sec-fetch-mode': 'navigate', | |
'sec-fetch-site': 'same-origin', | |
'sec-fetch-user': '?1', | |
'upgrade-insecure-requests': '1', | |
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36' | |
} | |
def get_version(url: str) -> str: | |
url = url.rstrip('/') + '/versions' | |
response = requests.get(url) | |
class Version(HTMLParser): | |
def __init__(self): | |
super().__init__() | |
self.recording = 0 | |
self.data = '' | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
for name, value in attrs: | |
if name == 'href' and '/versions/' in value: | |
self.recording = 1 | |
def handle_endtag(self, tag): | |
if tag == 'a' and self.recording: | |
self.recording -= 1 | |
def handle_data(self, data): | |
if self.recording: | |
self.data = data | |
parser = Version() | |
parser.feed(response.text) | |
return parser.data.strip() | |
def replicate_upscale(url: str, image_url: str, upscale: int = 2) -> str: | |
version = get_version(url) | |
headers = get_headers(url) | |
session = requests.Session() | |
anon_auth = session.get(url, headers=headers) | |
data = { | |
"version": version, | |
"input": { | |
"img": image_url, | |
"image": image_url, | |
"upscale": upscale, | |
"scale": upscale, | |
"version": "General - RealESRGANplus", | |
}, | |
"face_enhance": False, | |
"is_training": False, | |
"stream": False | |
} | |
response = session.post('https://replicate.com/api/predictions', headers=headers, data=json.dumps(data)) | |
prediction_id = response.json()['id'] | |
while True: | |
response = session.get(f'https://replicate.com/api/predictions/{prediction_id}', headers=headers) | |
if 'status' in response.json(): | |
status = response.json()['status'] | |
else: | |
status = 'processing' | |
if status == 'succeeded': | |
break | |
time.sleep(1) | |
session.close() | |
return response.json()['output'] | |
def upscaler(img_url: str) -> list[str] | None: | |
def run(url): | |
try: | |
return replicate_upscale(url, img_url) | |
except Exception as e: | |
print(e) | |
return None | |
urls = [ | |
'https://replicate.com/cjwbw/real-esrgan', | |
'https://replicate.com/daanelson/real-esrgan-a100', | |
'https://replicate.com/xinntao/realesrgan', | |
] | |
with ThreadPoolExecutor() as executor: | |
futures = {executor.submit(run, url) for url in urls} | |
for future in as_completed(futures): | |
result = future.result() | |
if result is not None: | |
break | |
return [result] | |
def check_upscale_result(image: str) -> list[str]: | |
attempt = 0 | |
response = None | |
while attempt < 3: | |
response = upscaler(upload_image(image)) | |
if response: | |
return response | |
attempt += 1 | |
return ['https://iili.io/JzrxjDP.png'] | |
with gr.Blocks(analytics_enabled=False, css=radio_css, theme='Taithrah/Minimal', title='апскейл') as demo: | |
with gr.Tab(label="апскейл", elem_id="upscale_tab"): | |
file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="upscaled_images") | |
upload_button = gr.UploadButton("выбор одного изображения для обработки", file_types=["image"], file_count="single", variant="primary") | |
upload_button.upload(fn=check_upscale_result, inputs=[upload_button], outputs=file_output, api_name="upscale") | |
with gr.Tab(label="восстановление лиц", id=1, elem_id="restore_tab"): | |
restore_method = gr.Radio(["codeformer", "gfpgan", "оба"], value="codeformer", label="", interactive=True) | |
restore_method.change(fn=lambda x: print(f"restore_method value = {x}"), inputs=restore_method, api_name="show_selected_method") | |
file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="restored_images") | |
upload_button = gr.UploadButton("выбор нескольких изображений для обработки", file_types=["image"], file_count="multiple", variant="primary") | |
upload_button.upload(fn=restore_upscale, inputs=[upload_button, restore_method], outputs=file_output, api_name="face_restore") | |
with gr.Tab(label="наложение зернистости пленки и шума", id=2, elem_id="textures_tab"): | |
with gr.Row(variant="compact", elem_id="textures_tab_images"): | |
input_image = gr.Image(label="исходник", sources=["upload", "clipboard"], type="numpy") | |
result_image = gr.Gallery(label="результат", elem_id="textured_result", allow_preview=True, preview=True, show_share_button=False, show_download_button=False) | |
opacity_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="видимость") | |
apply_button = gr.Button(value="применить", variant="primary") | |
texture_files = [(f"{i:02d}", texture) for i, texture in enumerate(os.listdir(textures_folder), start=1) if os.path.splitext(texture)[1].lower() in valid_extensions] | |
textures_choice = gr.Radio(texture_files, show_label=False, interactive=True, elem_id="textures") | |
apply_button.click(fn=apply_texture, inputs=[input_image, textures_choice, opacity_slider], outputs=result_image, api_name="texturize") | |
app = FastAPI() | |
async def some_fastapi_middleware(request: Request, call_next): | |
response = await call_next(request) | |
path = request.url.path | |
if path == "/": | |
response_body = "" | |
async for chunk in response.body_iterator: | |
response_body += chunk.decode() | |
javascript = f""" | |
<script type="text/javascript"> | |
{custom_js} | |
</script> | |
""" | |
response_body = response_body.replace("</body>", javascript + "</body>") | |
del response.headers["content-length"] | |
return Response( | |
content=response_body, | |
status_code=response.status_code, | |
headers=dict(response.headers), | |
media_type=response.media_type | |
) | |
return response | |
app.mount("/preview", StaticFiles(directory=os.path.join(root, 'preview')), name="preview") | |
gr.mount_gradio_app(app, demo, path="/") | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |