Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
from PIL import Image | |
from transformers import BlipProcessor, BlipForConditionalGeneration | |
import torch | |
from diffusers import AnimateDiffPipeline, LCMScheduler, MotionAdapter | |
from moviepy.editor import concatenate_videoclips, AudioFileClip | |
from moviepy.video.io.ImageSequenceClip import ImageSequenceClip | |
from transformers import AutoProcessor, MusicgenForConditionalGeneration | |
import scipy.io.wavfile | |
import re | |
import numpy as np | |
import os | |
from io import BytesIO | |
import tempfile | |
# 定义图像到文本函数 | |
def img2text(image): | |
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") | |
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") | |
inputs = processor(image, return_tensors="pt") | |
out = model.generate(**inputs) | |
caption = processor.decode(out[0], skip_special_tokens=True) | |
print(caption) | |
return caption | |
# 定义文本生成函数 | |
def text2text(user_input): | |
api_key = os.getenv ("openai_apikey") | |
base_url = "https://openrouter.ai/api/v1" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": "openai/gpt-3.5-turbo", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are an expert who is very good at writing stories. Please expand it into a continuous story based on the input, and logically cut the story into sentences. Each sentence is a scene (as many sentences and scenes as possible, and at least 10 sentences). Each sentence is required The content of the sentence description should be detailed and do not use rhetorical techniques, and no ambiguous words such as pronouns should appear in the sentence. Be as detailed as possible to accurately describe who is doing what, and the scene descriptions before and after should have a certain correlation. In addition, I require your answer to follow a certain format. Let me give you an example. For example, I enter: a dolphin jumping out of the water at sunset. " | |
"Your answer format: " | |
""" | |
[1] The sun nears the horizon, illuminating the calm sea surface with a warm glow. | |
[2] A dolphin swims swiftly below the calm sea surface, moving closer to the top. | |
[3] The dolphin uses its powerful tail fin to prepare for a leap out of the water. | |
[4] The dolphin's body starts to emerge from the water, exposing itself above the surface. | |
[5] The dolphin leaps completely out of the water, creating an arch with its body in the air. | |
[6] The dolphin rotates its body in the air before it begins its descent back to the water. | |
[7] The dolphin's head and back make the first contact with the water, creating a splash. | |
[8] The dolphin fully submerges under the water, causing the splashes around it to slowly disperse. | |
[9] The dolphin moves forward underwater, gradually disappearing into the dimming light of the sunset. | |
""" | |
"My input is as follows, please answer me in the format without adding any other words." | |
) | |
}, | |
{ "role": "user", "content": user_input } | |
] | |
} | |
response = requests.post(f"{base_url}/chat/completions", headers=headers, json=data) | |
response.raise_for_status() | |
completion = response.json() | |
print(completion['choices'][0]['message']['content']) | |
return completion['choices'][0]['message']['content'] | |
import torch | |
from diffusers import AnimateDiffPipeline, LCMScheduler, MotionAdapter | |
from diffusers.utils import export_to_gif | |
import re | |
def text2vid(input_text,desc = "4k, high resolution"): | |
# 使用正则表达式分割输入文本并提取句子 | |
sentences = re.findall(r'\[\d+\] (.+?)(?:\n|\Z)', input_text) | |
# 加载动作适配器和动画扩散管道 | |
adapter = MotionAdapter.from_pretrained("wangfuyun/AnimateLCM", config_file="wangfuyun/AnimateLCM/AnimateLCM/config.json", torch_dtype=torch.float16) | |
pipe = AnimateDiffPipeline.from_pretrained("emilianJR/epiCRealism", motion_adapter=adapter, torch_dtype=torch.float16) | |
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config, beta_schedule="linear") | |
# 加载LoRA权重 | |
pipe.load_lora_weights("wangfuyun/AnimateLCM", weight_name="AnimateLCM_sd15_t2v_lora.safetensors", adapter_name="lcm-lora") | |
# 设置适配器并启用功能 | |
try: | |
pipe.set_adapters(["lcm-lora"], [0.8]) | |
except ValueError as e: | |
print("Ignoring the error:", str(e)) | |
pipe.enable_vae_slicing() | |
pipe.enable_model_cpu_offload() | |
all_frames = [] # 存储所有句子的所有帧 | |
for index, sentence in enumerate(sentences): | |
output = pipe( | |
#prompt=sentence + ", " + desc, | |
prompt=sentence + ", " + desc, | |
negative_prompt="bad quality, worse quality, low resolution", | |
num_frames=24, | |
guidance_scale=2.0, | |
num_inference_steps=6, | |
generator=torch.Generator("cpu").manual_seed(0) | |
) | |
frames = output.frames[0] | |
all_frames.extend(frames) # 添加每个句子的帧到all_frames | |
return all_frames | |
def text2vid_pro(input_text,desc = "4k, high resolution"): | |
# 使用正则表达式分割输入文本并提取句子 | |
sentences = re.findall(r'\[\d+\] (.+?)(?:\n|\Z)', input_text) | |
# 加载动作适配器和动画扩散管道 | |
adapter = MotionAdapter.from_pretrained("wangfuyun/AnimateLCM", config_file="wangfuyun/AnimateLCM/config.json", torch_dtype=torch.float16) | |
pipe = AnimateDiffPipeline.from_pretrained("emilianJR/epiCRealism", motion_adapter=adapter, torch_dtype=torch.float16) | |
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config, beta_schedule="linear") | |
# 加载LoRA权重 | |
pipe.load_lora_weights("wangfuyun/AnimateLCM", weight_name="AnimateLCM_sd15_t2v_lora.safetensors", adapter_name="lcm-lora") | |
# 设置适配器并启用功能 | |
try: | |
pipe.set_adapters(["lcm-lora"], [0.8]) | |
except ValueError as e: | |
print("Ignoring the error:", str(e)) | |
pipe.enable_vae_slicing() | |
pipe.enable_model_cpu_offload() | |
# 循环遍历每个句子,生成动画并导出为GIF | |
for index, sentence in enumerate(sentences): | |
output = pipe( | |
#prompt=sentence + "," + desc , | |
prompt=sentence + ", cartoon", | |
negative_prompt="bad quality, worse quality, low resolution", | |
num_frames=24, | |
guidance_scale=2.0, | |
num_inference_steps=6, | |
generator=torch.Generator("cpu").manual_seed(0) | |
) | |
frames = output.frames[0] | |
export_to_gif(frames, f"{index+1}.gif") | |
def text2text_A(user_input): | |
# 设置API密钥和基础URL | |
api_key = os.getenv ("openai_apikey") | |
base_url = "https://openrouter.ai/api/v1" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": "openai/gpt-3.5-turbo", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are an expert in music criticism, please match this story with a suitable musical style based on my input and describe it, please make sure you follow my format output and do not add any other statements e.g. Input: in a small tavern everyone danced, the bartender poured drinks for everyone, everyone had a good time and was very happy and sang and danced. Output: 80s pop track with bassy drums and synth." | |
"Again, please make sure you follow the format of the output, here is my input:" | |
) | |
}, | |
{ "role": "user", "content": user_input } | |
] | |
} | |
response = requests.post(f"{base_url}/chat/completions", headers=headers, json=data) | |
response.raise_for_status() # 确保请求成功 | |
completion = response.json() | |
print(completion['choices'][0]['message']['content']) | |
return completion['choices'][0]['message']['content'] | |
# 定义文本到音频函数 | |
def text2audio(text_input, duration_seconds): | |
processor = AutoProcessor.from_pretrained("facebook/musicgen-small") | |
model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") | |
inputs = processor(text=[text_input], padding=True, return_tensors="pt") | |
max_new_tokens = int((duration_seconds / 5) * 256) | |
audio_values = model.generate(**inputs, max_new_tokens=max_new_tokens) | |
print(duration_seconds) | |
return audio_values[0, 0].numpy(), model.config.audio_encoder.sampling_rate | |
def text2audio_pro(text_input, duration_seconds): | |
processor = AutoProcessor.from_pretrained("facebook/musicgen-small") | |
model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") | |
inputs = processor(text=[text_input], padding=True, return_tensors="pt") | |
# Calculate max_new_tokens based on the desired duration | |
max_new_tokens = int((duration_seconds / 5) * 256) | |
audio_values = model.generate(**inputs, max_new_tokens=max_new_tokens) | |
# Save audio file | |
scipy.io.wavfile.write("bgm.wav", rate=model.config.audio_encoder.sampling_rate, data=audio_values[0, 0].numpy()) | |
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips | |
import glob | |
from transformers import AutoProcessor, MusicgenForConditionalGeneration | |
import scipy.io.wavfile | |
def video_generate_pro(img2text_input=" "): | |
# 设置视频帧率 | |
frame_rate = 24 # 可以修改这个值来设置不同的帧率 | |
# 获取所有GIF文件,假设它们位于同一文件夹并按名称排序 | |
gif_files = sorted(glob.glob('./*.gif')) | |
# 创建视频剪辑列表,每个GIF文件作为一个VideoFileClip | |
clips = [VideoFileClip(gif) for gif in gif_files] | |
# 连接视频剪辑 | |
final_clip = concatenate_videoclips(clips, method="compose") | |
# 输出视频文件 | |
final_clip.write_videofile('output_video.mp4', codec='libx264') | |
# 定义生成结果视频的函数 | |
def result_generate(video_clip, audio_clip): | |
video = video_clip.set_audio(audio_clip) | |
video_buffer = BytesIO() | |
video.write_videofile(video_buffer, codec="libx264", audio_codec="aac") | |
video_buffer.seek(0) | |
return video_buffer | |
from moviepy.editor import VideoFileClip, AudioFileClip | |
def result_generate_pro(): | |
# 加载视频文件 | |
video = VideoFileClip("output_video.mp4") | |
# 加载音频文件 | |
audio = AudioFileClip("bgm.wav") | |
# 将音频设置为视频的音频 | |
video = video.set_audio(audio) | |
# 导出新的视频文件 | |
video.write_videofile("result.mp4", codec="libx264", audio_codec="aac") | |
def generate_video_basic(image,desc): | |
# 获取图像描述 | |
text = img2text(image) | |
# 生成详细的文本场景描述 | |
sentences = text2text(text) | |
# 生成视频帧 | |
video_frames = text2vid(sentences,desc) | |
# 转换视频帧为numpy数组 | |
video_frames = [np.array(frame) for frame in video_frames] | |
# 创建视频片段 | |
video_clip = ImageSequenceClip(video_frames, fps=24) | |
video_duration = video_clip.duration | |
# 生成音频 | |
audio_text = text2text_A(text) | |
audio_data, audio_rate = text2audio(audio_text, video_clip.duration) | |
# 将音频数据写入临时文件 | |
with tempfile.NamedTemporaryFile(delete=True, suffix=".wav") as tmpfile: | |
scipy.io.wavfile.write(tmpfile, audio_rate, audio_data) | |
tmpfile.flush() # 确保数据已写入磁盘 | |
audio_clip = AudioFileClip(tmpfile.name) | |
# 将音频添加到视频中 | |
video_clip = video_clip.set_audio(audio_clip) | |
# 将视频写入临时文件 | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmpfile: | |
video_clip.write_videofile(tmpfile.name, codec="libx264", audio_codec="aac") | |
video_file_path = tmpfile.name | |
# 函数现在返回视频文件的路径,不再需要读取数据并删除 | |
return video_file_path | |
def generate_video_pro(image,desc): | |
# 获取图像描述 | |
text = img2text(image) | |
sentences = text2text(text) # 从文本生成结构化句子 | |
text2vid_pro(sentences, desc) # 从句子创建视频序列 | |
video_generate_pro() # 创建视频文件 | |
video = VideoFileClip("output_video.mp4") | |
duration = video.duration | |
print(duration) | |
audio_text = text2text_A(text) | |
text2audio_pro(audio_text,duration) | |
result_generate_pro() | |
return "result.mp4" | |
import traceback | |
def safe_generate_video(func, *args): | |
try: | |
# 尝试生成视频,调用传入的函数 | |
return func(*args), None | |
except Exception as e: | |
# 捕获任何异常并返回错误消息 | |
error_msg = f"An error occurred: {str(e)}" | |
# 可选:打印堆栈跟踪信息以便于调试 | |
print(traceback.format_exc()) | |
return None, error_msg | |
with gr.Blocks() as demo: | |
gr.Markdown("Upload an image and provide a description to generate a video.") | |
with gr.Tab("Basic Version"): | |
with gr.Row(): | |
image_input = gr.Image(type="pil") | |
description_input = gr.Textbox(label="Description", placeholder="Enter description here, e.g. '4k, high resolution'", lines=2) | |
with gr.Row(): | |
submit_button = gr.Button("Generate Basic Video") | |
video_output = gr.Video(label="Generated Video") | |
error_output = gr.Textbox(label="Error Messages", placeholder="No errors", lines=5) | |
submit_button.click( | |
lambda img, desc: safe_generate_video(generate_video_basic, img, desc), | |
inputs=[image_input, description_input], | |
outputs=[video_output, error_output] | |
) | |
with gr.Tab("Pro Version"): | |
with gr.Row(): | |
image_input_pro = gr.Image(type="pil") | |
description_input_pro = gr.Textbox(label="Description", placeholder="Enter description here, e.g. '4k, high resolution'", lines=2) | |
with gr.Row(): | |
submit_button_pro = gr.Button("Generate Pro Video") | |
video_output_pro = gr.Video(label="Generated Video") | |
error_output_pro = gr.Textbox(label="Error Messages", placeholder="No errors", lines=5) | |
submit_button_pro.click( | |
lambda img, desc: safe_generate_video(generate_video_pro, img, desc), | |
inputs=[image_input_pro, description_input_pro], | |
outputs=[video_output_pro, error_output_pro] | |
) | |
demo.launch() | |