Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import time | |
import json | |
from contextlib import closing | |
from websocket import create_connection | |
from deep_translator import GoogleTranslator | |
from langdetect import detect | |
import os | |
from PIL import Image | |
import io | |
import base64 | |
import os | |
import random | |
import tempfile | |
import re | |
from gradio_client import Client | |
import moviepy.editor as mp | |
def animate_img(encoded_string, model): | |
url_hg1 = os.getenv("url_hg1") | |
url_hg2 = os.getenv("url_hg2") | |
if model == "Stable Video Diffusion": | |
try: | |
r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(encoded_string, 'rb')}) | |
hash_ = r.json()['hash'] | |
time.sleep(10) | |
c = 0 | |
while c < 10: | |
r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") | |
source_string = r2.text | |
if "Generation has been in progress for" in source_string: | |
time.sleep(15) | |
c += 1 | |
continue | |
if "Generation has been in progress for" not in source_string: | |
pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' | |
matches = re.findall(pattern, source_string) | |
sd_video = [] | |
for match in matches: | |
sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") | |
if len(sd_video) != 0: | |
print("s_1") | |
return sd_video[0] | |
else: | |
_ = 1/0 | |
print("f_1") | |
except: | |
return None | |
#print("2") | |
#client1 = Client(url_hg1) | |
#result1 = client1.predict(encoded_string, api_name="/resize_image") | |
#client = Client(url_hg1) | |
#result = client.predict(result1, 0, True, 1, 15, api_name="/video") | |
#res = result[0]['video'] | |
#print("s_2") | |
#return res | |
#if model == "AnimateDiff": | |
# client = Client(url_hg2) | |
# result = client.predict(encoded_string, "zoom-out", api_name="/predict") | |
# return result | |
def create_video(prompt, model): | |
url_sd3 = os.getenv("url_sd3") | |
url_sd4 = os.getenv("url_sd4") | |
if model == "Stable Video Diffusion": | |
try: | |
with closing(create_connection(f"{url_sd3}", timeout=120)) as conn: | |
conn.send('{"fn_index":3,"session_hash":""}') | |
conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry",7.5,"(No style)"],"event_data":null,"fn_index":3,"session_hash":""}}') | |
c = 0 | |
while c < 60: | |
status = json.loads(conn.recv())['msg'] | |
if status == 'estimation': | |
c += 1 | |
time.sleep(1) | |
continue | |
if status == 'process_starts': | |
break | |
photo = json.loads(conn.recv())['output']['data'][0][0] | |
base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') | |
image_bytes = base64.b64decode(base64_string) | |
#with tempfile.NamedTemporaryFile(delete=False) as temp: | |
# temp.write(image_bytes) | |
# temp_file_path = temp.name | |
# print("cs_1") | |
except: | |
print("c_2") | |
with closing(create_connection(f"{url_sd4}", timeout=120)) as conn: | |
conn.send('{"fn_index":0,"session_hash":""}') | |
conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry","dreamshaperXL10_alpha2.safetensors [c8afe2ef]",30,"DPM++ 2M Karras",7,1024,1024,-1],"event_data":null,"fn_index":0,"session_hash":""}}') | |
conn.recv() | |
conn.recv() | |
conn.recv() | |
conn.recv() | |
photo = json.loads(conn.recv())['output']['data'][0] | |
base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') | |
image_bytes = base64.b64decode(base64_string) | |
#with tempfile.NamedTemporaryFile(delete=False) as temp: | |
# temp.write(image_bytes) | |
# temp_file_path = temp.name | |
# print("cs_2") | |
try: | |
r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": image_bytes}) | |
print(r.text) | |
hash_ = r.json()['hash'] | |
time.sleep(10) | |
c = 0 | |
while c < 10: | |
r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") | |
source_string = r2.text | |
if "Generation has been in progress for" in source_string: | |
time.sleep(15) | |
c += 1 | |
continue | |
if "Generation has been in progress for" not in source_string: | |
pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' | |
matches = re.findall(pattern, source_string) | |
sd_video = [] | |
for match in matches: | |
sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") | |
print(sd_video[0]) | |
if len(sd_video) != 0: | |
return sd_video[0] | |
else: | |
_ = 1/0 | |
except: | |
return None | |
#client1 = Client("https://emmadrex-stable-video-diffusion.hf.space") | |
#result1 = client1.predict(encoded_string, api_name="/resize_image") | |
#client = Client("https://emmadrex-stable-video-diffusion.hf.space") | |
#result = client.predict(result1, 0, True, 1, 15, api_name="/video") | |
#return result[0]['video'] | |
if model == "AnimateDiff": | |
data = {"prompt": prompt, "negative_prompt": "EasyNegative"} | |
r = requests.post("https://sd.cuilutech.com/sdapi/async/txt2gif", json=data) | |
c = 0 | |
while c < 60: | |
r2 = requests.post("https://sd.cuilutech.com/sdapi/get_task_info", json={'task_id': r.json()['data']['task_id']}) | |
time.sleep(2) | |
if r2.json()['data']: | |
photo = r2.json()['data']['image_urls'][0] | |
break | |
c += 1 | |
image = base64.b64encode(requests.get(photo).content).decode("utf-8") | |
with tempfile.NamedTemporaryFile(delete=False) as temp: | |
temp.write(base64.decodebytes(bytes(image, "utf-8"))) | |
temp_file_path = temp.name | |
clip = mp.VideoFileClip(temp_file_path) | |
n_im2 = f"{time.time()}" | |
temp_file2 = tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) | |
clip.write_videofile(temp_file2.name) | |
return temp_file2.name | |
def flip_text1(prompt, motion): | |
try: | |
language = detect(prompt) | |
if language == 'ru': | |
prompt = GoogleTranslator(source='ru', target='en').translate(prompt) | |
print(prompt) | |
except: | |
prompt = 'video' | |
url_video_g = os.getenv("url_video_g") | |
url_video_c = os.getenv("url_video_c") | |
if motion == "Приближение →←": | |
motion = 'zoom in' | |
if motion == "Отдаление ←→": | |
motion = 'zoom out' | |
if motion == "Вверх ↑": | |
motion = 'up' | |
if motion == "Вниз ↓": | |
motion = 'down' | |
if motion == "Влево ←": | |
motion = 'left' | |
if motion == "Вправо →": | |
motion = 'right' | |
if motion == "По часовой стрелке ⟳": | |
motion = 'rotate cw' | |
if motion == "Против часовой стрелки ⟲": | |
motion = 'rotate ccw' | |
data = {"prompt": f"{prompt}","image": "null", "denoise": 0.75,"motion": motion} | |
r = requests.post(f"{url_video_g}", json=data) | |
while True: | |
data2 = {"task_id": f"{r.json()['task_id']}"} | |
r2 = requests.post(f"{url_video_c}", json=data2) | |
time.sleep(3) | |
try: | |
if r2.json()['status'] == "QUEUED": | |
continue | |
if r2.json()['status'] == "PROCESSING": | |
continue | |
except: | |
try: | |
n_im2 = f"{time.time()}" | |
with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: | |
for chunk in r2.iter_content(chunk_size=1024): | |
if chunk: | |
file.write(chunk) | |
return file.name | |
except Exception as e: | |
print(e) | |
break | |
def flip_text2(encoded_string, prompt, motion): | |
url_video_g = os.getenv("url_video_g") | |
url_video_c = os.getenv("url_video_c") | |
try: | |
language = detect(prompt) | |
if language == 'ru': | |
prompt = GoogleTranslator(source='ru', target='en').translate(prompt) | |
print(prompt) | |
except: | |
pass | |
if motion == "Приближение →←": | |
motion = 'zoom in' | |
if motion == "Отдаление ←→": | |
motion = 'zoom out' | |
if motion == "Вверх ↑": | |
motion = 'up' | |
if motion == "Вниз ↓": | |
motion = 'down' | |
if motion == "Влево ←": | |
motion = 'left' | |
if motion == "Вправо →": | |
motion = 'right' | |
if motion == "По часовой стрелке ⟳": | |
motion = 'rotate cw' | |
if motion == "Против часовой стрелки ⟲": | |
motion = 'rotate ccw' | |
with open(encoded_string, "rb") as image_file: | |
encoded_string2 = base64.b64encode(image_file.read()) | |
encoded_string2 = str(encoded_string2).replace("b'", '') | |
data = {"prompt": f"{prompt}","image": f"{encoded_string2}","denoise":0.75,"motion": motion} | |
r = requests.post(f"{url_video_g}", json=data) | |
while True: | |
data2 = {"task_id": f"{r.json()['task_id']}"} | |
r2 = requests.post(f"{url_video_c}", json=data2) | |
time.sleep(3) | |
try: | |
if r2.json()['status'] == "QUEUED": | |
continue | |
if r2.json()['status'] == "PROCESSING": | |
continue | |
except: | |
try: | |
n_im2 = f"{time.time()}" | |
with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: | |
for chunk in r2.iter_content(chunk_size=1024): | |
if chunk: | |
file.write(chunk) | |
return file.name | |
except Exception as e: | |
print(e) | |
break | |
css = """ | |
#generate { | |
width: 100%; | |
background: #e253dd !important; | |
border: none; | |
border-radius: 50px; | |
outline: none !important; | |
color: white; | |
} | |
#generate:hover { | |
background: #de6bda !important; | |
outline: none !important; | |
color: #fff; | |
} | |
footer {visibility: hidden !important;} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Tab("Сгенерировать видео"): | |
with gr.Column(): | |
prompt = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание:', lines=3) | |
# motion1 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) | |
model = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, | |
label="Модель нейросети:", choices=['Stable Video Diffusion']) | |
with gr.Column(): | |
text_button = gr.Button("Сгенерировать видео", variant='primary', elem_id="generate") | |
with gr.Column(): | |
video_output = gr.Video(show_label=True, label='Результат:', type="file") | |
text_button.click(create_video, inputs=[prompt, model], outputs=video_output) | |
with gr.Tab("Анимировать изображение"): | |
with gr.Column(): | |
prompt2 = gr.Image(show_label=True, interactive=True, type='filepath', label='Исходное изображение:') | |
# prompt12 = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание видео (опционально):', lines=3) | |
# motion2 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) | |
model2 = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, | |
label="Модель нейросети:", choices=['Stable Video Diffusion']) | |
with gr.Column(): | |
text_button2 = gr.Button("Анимировать изображение", variant='primary', elem_id="generate") | |
with gr.Column(): | |
video_output2 = gr.Video(show_label=True, label='Результат:', type="file") | |
text_button2.click(animate_img, inputs=[prompt2, model2], outputs=video_output2) | |
demo.queue(concurrency_count=12) | |
demo.launch() |