face_and_grain / app.py
2ch's picture
Update app.py
7fe4df4
import os
import gradio as gr
import shutil
import numpy as np
from PIL import Image
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from gradio_client import Client
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from starlette.responses import Response
import uvicorn
import requests
from urllib.parse import quote
from unicodedata import normalize
import json
import time
from html.parser import HTMLParser
try:
import cv2
except ImportError:
os.system('pip install opencv-python')
import cv2
root = os.path.dirname(os.path.abspath(__file__))
textures_folder = os.path.join(root, 'textures')
os.makedirs(textures_folder, exist_ok=True)
valid_extensions = ['.jpeg', '.jpg', '.png']
textures_repo = "https://huggingface.co/datasets/2ch/textures/resolve/main/"
textures_for_download = [
f"{textures_repo}гауссовский_шум_и_мелкое_зерно.png?download=true",
f"{textures_repo}грязная_матрица.png?download=true",
f"{textures_repo}для_ночных_и_тёмных_кадров_сильный_шум_и_пыль.png?download=true",
f"{textures_repo}для_ночных_и_тёмных_кадров_царапины_шум_пыль_дымка.png?download=true",
f"{textures_repo}для_светлых_и_солнечных_ярких_фото_мелкое_констрастное_зерно.png?download=true",
f"{textures_repo}зернистость_плёнки.png?download=true",
f"{textures_repo}зернистость_плёнки_с_грязью.png?download=true",
f"{textures_repo}испорченная_ворсом_плёнка.png?download=true",
f"{textures_repo}мелкий_цветной_шум.png?download=true",
f"{textures_repo}мелкое_контрастное_зерно_и_средний_цветвой_шум.png?download=true",
f"{textures_repo}очень_мелкое_зерно.png?download=true",
f"{textures_repo}пыльная_плёнка.png?download=true",
f"{textures_repo}сильный_цветовой_шум_для_ночных_фото.png?download=true",
f"{textures_repo}слабый_естественный_шум_матрицы_смартфона.png?download=true",
f"{textures_repo}среднее_зерно.png?download=true",
f"{textures_repo}среднее_монохромное_зерно_пыль_и_ворсинки.png?download=true",
f"{textures_repo}средний_цветной_шум.png?download=true",
f"{textures_repo}старая_матрица.png?download=true",
f"{textures_repo}старая_потёртая_плёнка.png?download=true",
f"{textures_repo}цветной_шум_матрицы.png?download=true",
f"{textures_repo}цветной_шум_на_плёнке.png?download=true",
f"{textures_repo}шумная_матрица.png?download=true",
]
def dl_textures(texture_url: str) -> None:
texture_for_download = quote(normalize('NFD', texture_url), safe='/?:=')
filename = texture_url.split('/')[-1].split('?')[0]
file_path = os.path.join(textures_folder, filename)
response = requests.get(texture_for_download, stream=True)
response.raise_for_status()
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
def create_texture_preview(texture_folder: str, output_folder: str, size: tuple[int] = (246, 246)) -> None:
os.makedirs(output_folder, exist_ok=True)
for texture in os.listdir(texture_folder):
img_path = os.path.join(texture_folder, texture)
img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
start_x = np.random.randint(0, img.shape[1] - size[1])
start_y = np.random.randint(0, img.shape[0] - size[0])
img = img[start_y:start_y + size[0], start_x:start_x + size[1]]
cv2.imwrite(os.path.join(output_folder, texture), img)
def prepare_textures(texture_folder: str, output_folder: str) -> None:
with ThreadPoolExecutor(max_workers=len(textures_for_download)) as executor:
futures = [executor.submit(dl_textures, texture_for_download) for texture_for_download in
textures_for_download]
for future in as_completed(futures):
future.result()
create_texture_preview(texture_folder, output_folder, size=(246, 246))
preview_css = ""
prepare_textures(textures_folder, os.path.join(root, 'preview'))
for i, texture in enumerate(os.listdir(textures_folder), start=1):
if os.path.splitext(texture)[1].lower() in valid_extensions:
preview_css += f"""[data-testid="{i:02d}-radio-label"]::before {{
background-color: transparent !important;
background-image: url("./preview/{texture}") !important;
}}\n"""
radio_css = """
html,
body {
background: var(--body-background-fill);
}
.gradio-container {
max-width: 1396px !important;
}
#textures label {
position: relative;
width: 256px;
height: 256px;
display: flex;
flex-direction: row;
align-items: flex-end;
background: none !important;
padding: 4px !important;
transition: .3s;
}
#textures label::before {
width: 246px;
height: 246px;
border-radius: 8px;
display: block;
content: "";
transition: .3s;
background: red;
position: relative;
top: 0px;
}
#textures label:hover::before,
#textures label:active::before,
#textures label.selected::before {
mix-blend-mode: soft-light;
transition: .3s
}
#textures span:not([data-testid="block-info"]),
#textures input {
position: absolute;
z-index: 999;
}
#textures input {
position: absolute;
z-index: 999;
bottom: 9px;
left: 9px;
}
#textures span:not([data-testid="block-info"]) {
left: 21px;
padding: 2px 8px;
background: rgba(0, 0, 0, .57);
backdrop-filter: blur(3px)
}
#textures {
background-color: hsla(0, 0%, 50%, 1);
}
.built-with,
.show-api,
footer .svelte-mpyp5e {
display: none !important;
}
footer:after {
content: "ну пролапс, ну и что?";
}
#zoom {
position: absolute;
top: 50%;
left: 50%;
width: 250px;
height: 250px;
background-repeat: no-repeat;
box-shadow: 0px 0px 10px 5px rgba(0, 0, 0, .2);
border-radius: 50%;
cursor: none;
pointer-events: none;
z-index: 999;
opacity: 0;
transform: scale(0);
transition: opacity 500ms, transform 500ms;
}
#textures_tab .image-button {
cursor: none;
}
#textured_result-download-link,
#restored_image-download-link,
#upscaled_image-download-link {
position: absolute;
z-index: 9999;
padding: 2px 4px;
margin: 0 7px;
background: black;
bottom: 0;
right: 0;
font-size: 20px;
transition: 300ms
}
#textured_result-download-link:hover,
#restored_image-download-link:hover,
#upscaled_image-download-link:hover {
color: #99f7a8
}
#restored_images.disabled,
#upscaled_images.disabled {
height: 0px !important;
opacity: 0;
transition: 300ms
}
#restored_images.enabled,
#upscaled_images.enabled {
transition: 300ms
}
""" + preview_css
custom_js = """
const PageLoadObserver = new MutationObserver((mutationsList, observer) => {
for (let mutation of mutationsList) {
if (mutation.type === 'childList') {
const tabsDiv = document.querySelector('div.tab-nav');
if (tabsDiv) {
observer.disconnect();
document.querySelector('#textures_tab-button').addEventListener('click', () => {
setTimeout(() => {
let labels = document.querySelectorAll('label[data-testid]');
labels.forEach((label) => {
let input = label.querySelector('input[type="radio"]');
if (input) {
let title = input.value.split('.')[0].replace(/_/g, ' ');
label.title = title;
}
});
document.querySelector("label[data-testid='05-radio-label']").click()
}, 150);
})
function checkImagesAndSetClass(galleryElement) {
const firstDiv = galleryElement.querySelector('div:first-child');
const hasChildElements = firstDiv && firstDiv.children.length > 0;
const hasImages = galleryElement.querySelectorAll('img').length > 0;
if (hasChildElements || hasImages) {
galleryElement.classList.add('enabled');
galleryElement.classList.remove('disabled');
} else {
galleryElement.classList.add('disabled');
galleryElement.classList.remove('enabled');
}
}
function setupGalleryObserver(galleryId) {
let gallery = document.getElementById(galleryId);
const observer = new MutationObserver(() => {
checkImagesAndSetClass(gallery);
});
observer.observe(gallery, { childList: true, subtree: true });
checkImagesAndSetClass(gallery);
}
setupGalleryObserver('restored_images');
setupGalleryObserver('upscaled_images');
function magnify(imgID, zoom) {
var img, glass, w, h, bw;
img = document.querySelector(imgID);
glass = document.createElement("DIV");
glass.setAttribute("id", "zoom");
img.parentElement.insertBefore(glass, img);
glass.style.backgroundImage = "url('" + img.src + "')";
glass.style.backgroundRepeat = "no-repeat";
glass.style.backgroundSize = (img.width * zoom) + "px " + (img.height * zoom) + "px";
bw = 3;
w = glass.offsetWidth / 2;
h = glass.offsetHeight / 2;
glass.addEventListener("mousemove", moveMagnifier);
img.addEventListener("mousemove", moveMagnifier);
glass.addEventListener("touchmove", moveMagnifier);
img.addEventListener("touchmove", moveMagnifier);
function moveMagnifier(e) {
var pos, x, y;
e.preventDefault();
pos = getCursorPos(e);
x = pos.x;
y = pos.y;
if (x > img.width - (w / zoom)) { x = img.width - (w / zoom); }
if (x < w / zoom) { x = w / zoom; }
if (y > img.height - (h / zoom)) { y = img.height - (h / zoom); }
if (y < h / zoom) { y = h / zoom; }
glass.style.left = (x - w) + "px";
glass.style.top = (y - h) + "px";
glass.style.backgroundPosition = "-" + ((x * zoom) - w + bw) + "px -" + ((y * zoom) - h) + "px";
glass.style.backgroundImage = "url('" + img.src + "')";
}
function getCursorPos(e) {
var a, x = 0, y = 0;
e = e || window.event;
a = img.getBoundingClientRect();
x = e.pageX - a.left;
y = e.pageY - a.top;
x = x - window.scrollX;
y = y - window.scrollY;
return { x: x, y: y };
}
img.addEventListener("mouseover", function () {
glass.style.opacity = "1";
glass.style.transform = "scale(1)";
});
img.addEventListener("mouseout", function () {
glass.style.opacity = "0";
glass.style.transform = "scale(0)";
});
}
function setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage) {
const imgElement = document.querySelector(imgSelector);
if (imgElement && imgElement.src) {
let downloadLink = document.querySelector(linkSelector);
if (!downloadLink) {
if (magnifyImage) {
magnify(magnifyImage, 3);
}
downloadLink = document.createElement('a');
downloadLink.id = linkId;
downloadLink.innerText = 'скачать';
imgElement.after(downloadLink);
}
downloadLink.href = imgElement.src;
downloadLink.download = '';
}
}
const DownloadLinkObserverCallback = (mutationsList, observer, imgSelector, linkSelector, linkId, magnifyImage) => {
setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage);
};
const DownloadLinkObserverOptions = { childList: true, subtree: true, attributes: true, attributeFilter: ['src'] };
const ImageTexturedObserver = new MutationObserver((mutationsList, observer) => {
DownloadLinkObserverCallback(mutationsList, observer, '#textured_result img[data-testid="detailed-image"]', '#textured_result-download-link', 'textured_result-download-link', "#textured_result .image-button img");
});
ImageTexturedObserver.observe(document, DownloadLinkObserverOptions);
const ImageRestoredObserver = new MutationObserver((mutationsList, observer) => {
DownloadLinkObserverCallback(mutationsList, observer, '#restored_images img[data-testid="detailed-image"]', '#restored_image-download-link', 'restored_image-download-link');
});
ImageRestoredObserver.observe(document, DownloadLinkObserverOptions);
const ImageUpscaledObserver = new MutationObserver((mutationsList, observer) => {
DownloadLinkObserverCallback(mutationsList, observer, '#upscaled_images img[data-testid="detailed-image"]', '#upscaled_image-download-link', 'upscaled_image-download-link');
});
ImageUpscaledObserver.observe(document, DownloadLinkObserverOptions);
}
}
}
});
PageLoadObserver.observe(document, { childList: true, subtree: true });
"""
def extract_path_from_result(predict_answer: str | list[str] | tuple[str]) -> str:
if isinstance(predict_answer, (tuple, list)):
result = predict_answer[0]
shutil.rmtree(os.path.dirname(predict_answer[1]), ignore_errors=True)
else:
result = predict_answer
return result
def restore_face_common(img_path: str, predict_answer: str, model: str) -> None:
result = extract_path_from_result(predict_answer)
if os.path.exists(result):
if os.path.exists(img_path):
os.unlink(img_path)
new_file, new_extension = os.path.splitext(result)
old_file, old_extension = os.path.splitext(img_path)
old_filename = os.path.basename(old_file)
new_location = os.path.join(os.path.dirname(img_path), f"{old_filename}_{model}{new_extension}")
shutil.move(result, new_location)
shutil.rmtree(os.path.dirname(result), ignore_errors=True)
def restore_face_gfpgan(img_path: str) -> None:
client = Client(src="https://xintao-gfpgan.hf.space/", verbose=False)
result = client.predict(img_path, "v1.4", 4, api_name="/predict")
restore_face_common(img_path, result, "gfpgan")
def restore_face_codeformer(img_path: str) -> None:
client = Client(src="https://sczhou-codeformer.hf.space/", verbose=False)
result = client.predict(img_path, True, True, True, 2, 0, api_name="/predict")
restore_face_common(img_path, result, "codeformer")
async def restore_faces_one_image(img_path: str, func_list: list) -> bool:
def run_func(func) -> bool:
for _ in range(3):
try:
func(img_path)
return True
except Exception as e:
print(f"ошибка в {func.__name__}: {e}")
return False
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=len(func_list)) as executor:
futures = [loop.run_in_executor(executor, run_func, func) for func in func_list]
results = await asyncio.gather(*futures)
return any(results)
async def restore_faces_batch(input_images: list[str], func_list: list, batch_size: int = 3) -> bool:
results = False
try:
batches = [input_images[i:i + batch_size] for i in range(0, len(input_images), batch_size)]
for batch in batches:
tasks = [restore_faces_one_image(img_path, func_list) for img_path in batch]
results = await asyncio.gather(*tasks)
return any(results)
except Exception as error:
print(error)
return results
def get_file_paths(input_path: str | list[str], extensions_list: list[str]) -> list[str]:
files = []
def add_files_from_directory(directory):
for file_name in os.listdir(directory):
if os.path.splitext(file_name)[1] in extensions_list:
files.append(os.path.abspath(os.path.join(directory, file_name)))
if isinstance(input_path, list):
for file_path in input_path:
parent_directory = os.path.dirname(file_path)
add_files_from_directory(parent_directory)
else:
add_files_from_directory(input_path)
return files
async def restore_upscale(files: tuple, restore_method: str) -> list[str]:
file_paths = [file.name for file in files]
if restore_method == 'codeformer':
func_list = [restore_face_codeformer]
elif restore_method == 'gfpgan':
func_list = [restore_face_gfpgan]
else:
func_list = [restore_face_codeformer, restore_face_gfpgan]
results = await restore_faces_batch(file_paths, func_list, batch_size=3)
if results:
file_paths = get_file_paths(file_paths, valid_extensions)
return file_paths
else:
return ['https://iili.io/JzrxjDP.png']
def image_noise_softlight_layer_mix(img, texture, output: str = None, opacity: float = 0.7):
if isinstance(img, Image.Image):
img = np.array(img).astype(float)
elif isinstance(img, np.ndarray):
img = img.astype(float)
if img.shape[2] == 3 and not isinstance(img, Image.Image):
img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2BGR).astype(float)
overlay = cv2.imread(texture, cv2.IMREAD_UNCHANGED).astype(float)
start_x = np.random.randint(0, overlay.shape[1] - img.shape[1])
start_y = np.random.randint(0, overlay.shape[0] - img.shape[0])
overlay = overlay[start_y:start_y + img.shape[0], start_x:start_x + img.shape[1]]
if img.shape[2] == 3:
img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float)
if overlay.shape[2] == 3:
overlay = cv2.cvtColor(overlay.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float)
overlay[..., 3] *= opacity
img_in_norm = img / 255.0
img_layer_norm = overlay / 255.0
comp_alpha = np.minimum(img_in_norm[:, :, 3], img_layer_norm[:, :, 3]) * 1.0
new_alpha = img_in_norm[:, :, 3] + (1.0 - img_in_norm[:, :, 3]) * comp_alpha
np.seterr(divide='ignore', invalid='ignore')
ratio = comp_alpha / new_alpha
ratio[ratio == np.NAN] = 0.0
comp = (1.0 - img_in_norm[:, :, :3]) * img_in_norm[:, :, :3] * img_layer_norm[:, :, :3] + img_in_norm[:, :, :3] * (
1.0 - (1.0 - img_in_norm[:, :, :3]) * (1.0 - img_layer_norm[:, :, :3]))
ratio_rs = np.reshape(np.repeat(ratio, 3), [comp.shape[0], comp.shape[1], comp.shape[2]])
img_out = comp * ratio_rs + img_in_norm[:, :, :3] * (1.0 - ratio_rs)
img_out = np.nan_to_num(np.dstack((img_out, img_in_norm[:, :, 3])))
result = img_out * 255.0
rgb_image = cv2.cvtColor(result.astype(np.uint8), cv2.COLOR_BGR2RGB)
image = Image.fromarray(rgb_image)
return np.array(image)
def apply_texture(input_image, textures_choice: str, opacity_slider: float):
result = image_noise_softlight_layer_mix(input_image, os.path.join(textures_folder, textures_choice), opacity=opacity_slider)
return [result]
def temp_upload_file(file_path: str) -> str | None:
servers = [
('https://transfer.sh/', 'fileToUpload'),
('https://x0.at/', 'file'),
('https://tmpfiles.org/api/v1/upload', 'file'),
('https://uguu.se/upload.php', 'files[]')
]
for i in range(3):
for server, file_key in servers:
try:
with open(file_path, 'rb') as f:
files = {file_key: f}
response = requests.post(server, files=files)
if response.status_code == 200:
if server == 'https://transfer.sh/':
return response.text.replace("https://transfer.sh/","https://transfer.sh/get/").replace("\n","")
elif server == 'https://tmpfiles.org/api/v1/upload':
response_json = response.json()
if response_json['status'] == 'success':
return response_json['data']['url'].replace("https://tmpfiles.org/", "https://tmpfiles.org/dl/")
elif server == 'https://uguu.se/upload.php':
response_json = response.json()
if response_json['success']:
return response_json['files'][0]['url']
else:
return response.text
except Exception as e:
print(f'{server}: {e}')
return None
def upload_image(image_path: str) -> str | None:
files = {'source': open(image_path, "rb")}
data = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'}
response = requests.post('https://freeimage.host/api/1/upload', files=files, data=data)
if response.json()["status_code"] == 200:
return response.json()["image"]["url"]
else:
return temp_upload_file(image_path)
def get_headers(url: str) -> dict:
session = requests.Session()
anon_auth = session.get(url)
cookies = session.cookies.get_dict()
return {
'content-type': 'application/json',
'cookie': f'csrftoken={cookies["csrftoken"]}; replicate_anonymous_id={cookies["replicate_anonymous_id"]};',
'origin': 'https://replicate.com',
'x-csrftoken': cookies['csrftoken'],
'authority': 'replicate.com',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/jxl,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'no-cache',
'dnt': '1',
'pragma': 'no-cache',
'referer': f'{url}?input=http',
'sec-ch-ua': '"Chromium";v="117", "Not;A=Brand";v="8"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36'
}
def get_version(url: str) -> str:
url = url.rstrip('/') + '/versions'
response = requests.get(url)
class Version(HTMLParser):
def __init__(self):
super().__init__()
self.recording = 0
self.data = ''
def handle_starttag(self, tag, attrs):
if tag == 'a':
for name, value in attrs:
if name == 'href' and '/versions/' in value:
self.recording = 1
def handle_endtag(self, tag):
if tag == 'a' and self.recording:
self.recording -= 1
def handle_data(self, data):
if self.recording:
self.data = data
parser = Version()
parser.feed(response.text)
return parser.data.strip()
def replicate_upscale(url: str, image_url: str, upscale: int = 2) -> str:
version = get_version(url)
headers = get_headers(url)
session = requests.Session()
anon_auth = session.get(url, headers=headers)
data = {
"version": version,
"input": {
"img": image_url,
"image": image_url,
"upscale": upscale,
"scale": upscale,
"version": "General - RealESRGANplus",
},
"face_enhance": False,
"is_training": False,
"stream": False
}
response = session.post('https://replicate.com/api/predictions', headers=headers, data=json.dumps(data))
prediction_id = response.json()['id']
while True:
response = session.get(f'https://replicate.com/api/predictions/{prediction_id}', headers=headers)
if 'status' in response.json():
status = response.json()['status']
else:
status = 'processing'
if status == 'succeeded':
break
time.sleep(1)
session.close()
return response.json()['output']
def upscaler(img_url: str) -> list[str] | None:
def run(url):
try:
return replicate_upscale(url, img_url)
except Exception as e:
print(e)
return None
urls = [
'https://replicate.com/cjwbw/real-esrgan',
'https://replicate.com/daanelson/real-esrgan-a100',
'https://replicate.com/xinntao/realesrgan',
]
with ThreadPoolExecutor() as executor:
futures = {executor.submit(run, url) for url in urls}
for future in as_completed(futures):
result = future.result()
if result is not None:
break
return [result]
def check_upscale_result(image: str) -> list[str]:
attempt = 0
response = None
while attempt < 3:
response = upscaler(upload_image(image))
if response:
return response
attempt += 1
return ['https://iili.io/JzrxjDP.png']
with gr.Blocks(analytics_enabled=False, css=radio_css, theme='Taithrah/Minimal', title='апскейл') as demo:
with gr.Tab(label="апскейл", elem_id="upscale_tab"):
file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="upscaled_images")
upload_button = gr.UploadButton("выбор одного изображения для обработки", file_types=["image"], file_count="single", variant="primary")
upload_button.upload(fn=check_upscale_result, inputs=[upload_button], outputs=file_output, api_name="upscale")
with gr.Tab(label="восстановление лиц", id=1, elem_id="restore_tab"):
restore_method = gr.Radio(["codeformer", "gfpgan", "оба"], value="codeformer", label="", interactive=True)
restore_method.change(fn=lambda x: print(f"restore_method value = {x}"), inputs=restore_method, api_name="show_selected_method")
file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="restored_images")
upload_button = gr.UploadButton("выбор нескольких изображений для обработки", file_types=["image"], file_count="multiple", variant="primary")
upload_button.upload(fn=restore_upscale, inputs=[upload_button, restore_method], outputs=file_output, api_name="face_restore")
with gr.Tab(label="наложение зернистости пленки и шума", id=2, elem_id="textures_tab"):
with gr.Row(variant="compact", elem_id="textures_tab_images"):
input_image = gr.Image(label="исходник", sources=["upload", "clipboard"], type="numpy")
result_image = gr.Gallery(label="результат", elem_id="textured_result", allow_preview=True, preview=True, show_share_button=False, show_download_button=False)
opacity_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="видимость")
apply_button = gr.Button(value="применить", variant="primary")
texture_files = [(f"{i:02d}", texture) for i, texture in enumerate(os.listdir(textures_folder), start=1) if os.path.splitext(texture)[1].lower() in valid_extensions]
textures_choice = gr.Radio(texture_files, show_label=False, interactive=True, elem_id="textures")
apply_button.click(fn=apply_texture, inputs=[input_image, textures_choice, opacity_slider], outputs=result_image, api_name="texturize")
app = FastAPI()
@app.middleware("http")
async def some_fastapi_middleware(request: Request, call_next):
response = await call_next(request)
path = request.url.path
if path == "/":
response_body = ""
async for chunk in response.body_iterator:
response_body += chunk.decode()
javascript = f"""
<script type="text/javascript">
{custom_js}
</script>
"""
response_body = response_body.replace("</body>", javascript + "</body>")
del response.headers["content-length"]
return Response(
content=response_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
return response
app.mount("/preview", StaticFiles(directory=os.path.join(root, 'preview')), name="preview")
gr.mount_gradio_app(app, demo, path="/")
uvicorn.run(app, host="0.0.0.0", port=7860)