import gradio as gr import requests import time import json from contextlib import closing from websocket import create_connection from deep_translator import GoogleTranslator from langdetect import detect import os from PIL import Image import io import base64 import os import random import tempfile import re from gradio_client import Client def animate_img(encoded_string): try: r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(encoded_string, 'rb')}) print(r.text) hash_ = r.json()['hash'] time.sleep(10) c = 0 while c < 10: r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") source_string = r2.text if "Generation has been in progress for" in source_string: time.sleep(10) c += 1 continue if "Generation has been in progress for" not in source_string: pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' matches = re.findall(pattern, source_string) sd_video = [] for match in matches: sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") print(sd_video[0]) return sd_video[0] except: client1 = Client("https://emmadrex-stable-video-diffusion.hf.space") result1 = client1.predict(encoded_string, api_name="/resize_image") client = Client("https://emmadrex-stable-video-diffusion.hf.space") result = client.predict(result1, 0, True, 1, 15, api_name="/video") return result[0]['video'] def create_video(prompt): url_sd3 = os.getenv("url_sd3") url_sd4 = os.getenv("url_sd4") try: with closing(create_connection(f"{url_sd3}", timeout=120)) as conn: conn.send('{"fn_index":3,"session_hash":""}') conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry",7.5,"(No style)"],"event_data":null,"fn_index":3,"session_hash":""}}') c = 0 while c < 45: status = json.loads(conn.recv())['msg'] if status == 'estimation': c += 1 time.sleep(1) continue if status == 'process_starts': break photo = json.loads(conn.recv())['output']['data'][0][0] base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') image_bytes = base64.b64decode(base64_string) with tempfile.NamedTemporaryFile(delete=False) as temp: temp.write(image_bytes) temp_file_path = temp.name except: with closing(create_connection(f"{url_sd4}", timeout=120)) as conn: conn.send('{"fn_index":0,"session_hash":""}') conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry","dreamshaperXL10_alpha2.safetensors [c8afe2ef]",30,"DPM++ 2M Karras",7,1024,1024,-1],"event_data":null,"fn_index":0,"session_hash":""}}') conn.recv() conn.recv() conn.recv() conn.recv() photo = json.loads(conn.recv())['output']['data'][0] base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') image_bytes = base64.b64decode(base64_string) with tempfile.NamedTemporaryFile(delete=False) as temp: temp.write(image_bytes) temp_file_path = temp.name try: r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(temp_file_path, 'rb')}) print(r.text) hash_ = r.json()['hash'] time.sleep(10) c = 0 while c < 10: r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") source_string = r2.text if "Generation has been in progress for" in source_string: time.sleep(10) c += 1 continue if "Generation has been in progress for" not in source_string: pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' matches = re.findall(pattern, source_string) sd_video = [] for match in matches: sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") print(sd_video[0]) return sd_video[0] except: client1 = Client("https://emmadrex-stable-video-diffusion.hf.space") result1 = client1.predict(encoded_string, api_name="/resize_image") client = Client("https://emmadrex-stable-video-diffusion.hf.space") result = client.predict(result1, 0, True, 1, 15, api_name="/video") return result[0]['video'] def flip_text1(prompt, motion): try: language = detect(prompt) if language == 'ru': prompt = GoogleTranslator(source='ru', target='en').translate(prompt) print(prompt) except: prompt = 'video' url_video_g = os.getenv("url_video_g") url_video_c = os.getenv("url_video_c") if motion == "Приближение →←": motion = 'zoom in' if motion == "Отдаление ←→": motion = 'zoom out' if motion == "Вверх ↑": motion = 'up' if motion == "Вниз ↓": motion = 'down' if motion == "Влево ←": motion = 'left' if motion == "Вправо →": motion = 'right' if motion == "По часовой стрелке ⟳": motion = 'rotate cw' if motion == "Против часовой стрелки ⟲": motion = 'rotate ccw' data = {"prompt": f"{prompt}","image": "null", "denoise": 0.75,"motion": motion} r = requests.post(f"{url_video_g}", json=data) while True: data2 = {"task_id": f"{r.json()['task_id']}"} r2 = requests.post(f"{url_video_c}", json=data2) time.sleep(3) try: if r2.json()['status'] == "QUEUED": continue if r2.json()['status'] == "PROCESSING": continue except: try: n_im2 = f"{time.time()}" with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: for chunk in r2.iter_content(chunk_size=1024): if chunk: file.write(chunk) return file.name except Exception as e: print(e) break def flip_text2(encoded_string, prompt, motion): url_video_g = os.getenv("url_video_g") url_video_c = os.getenv("url_video_c") try: language = detect(prompt) if language == 'ru': prompt = GoogleTranslator(source='ru', target='en').translate(prompt) print(prompt) except: pass if motion == "Приближение →←": motion = 'zoom in' if motion == "Отдаление ←→": motion = 'zoom out' if motion == "Вверх ↑": motion = 'up' if motion == "Вниз ↓": motion = 'down' if motion == "Влево ←": motion = 'left' if motion == "Вправо →": motion = 'right' if motion == "По часовой стрелке ⟳": motion = 'rotate cw' if motion == "Против часовой стрелки ⟲": motion = 'rotate ccw' with open(encoded_string, "rb") as image_file: encoded_string2 = base64.b64encode(image_file.read()) encoded_string2 = str(encoded_string2).replace("b'", '') data = {"prompt": f"{prompt}","image": f"{encoded_string2}","denoise":0.75,"motion": motion} r = requests.post(f"{url_video_g}", json=data) while True: data2 = {"task_id": f"{r.json()['task_id']}"} r2 = requests.post(f"{url_video_c}", json=data2) time.sleep(3) try: if r2.json()['status'] == "QUEUED": continue if r2.json()['status'] == "PROCESSING": continue except: try: n_im2 = f"{time.time()}" with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: for chunk in r2.iter_content(chunk_size=1024): if chunk: file.write(chunk) return file.name except Exception as e: print(e) break css = """ #generate { width: 100%; background: #e253dd !important; border: none; border-radius: 50px; outline: none !important; color: white; } #generate:hover { background: #de6bda !important; outline: none !important; color: #fff; } footer {visibility: hidden !important;} """ with gr.Blocks(css=css) as demo: with gr.Tab("Сгенерировать видео"): with gr.Column(): prompt = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание:', lines=3) motion1 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=[ "Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) with gr.Column(): text_button = gr.Button("Сгенерировать видео", variant='primary', elem_id="generate") with gr.Column(): video_output = gr.Video(show_label=True, label='Результат:', type="file") text_button.click(create_video, inputs=[prompt], outputs=video_output) with gr.Tab("Анимировать изображение"): with gr.Column(): prompt2 = gr.Image(show_label=True, interactive=True, type='filepath', label='Исходное изображение:') prompt12 = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание видео (опционально):', lines=3) motion2 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=[ "Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) with gr.Column(): text_button2 = gr.Button("Анимировать изображение", variant='primary', elem_id="generate") with gr.Column(): video_output2 = gr.Video(show_label=True, label='Результат:', type="file") text_button2.click(animate_img, inputs=[prompt2], outputs=video_output2) demo.queue(concurrency_count=12) demo.launch()