ai-tube-model-ltxv-1 / xora /examples /image_to_video.py
Sapir's picture
Lint: added ruff.
325137b
raw
history blame
9.17 kB
import torch
from xora.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from xora.models.transformers.transformer3d import Transformer3DModel
from xora.models.transformers.symmetric_patchifier import SymmetricPatchifier
from xora.schedulers.rf import RectifiedFlowScheduler
from xora.pipelines.pipeline_video_pixart_alpha import VideoPixArtAlphaPipeline
from pathlib import Path
from transformers import T5EncoderModel, T5Tokenizer
import safetensors.torch
import json
import argparse
from xora.utils.conditioning_method import ConditioningMethod
import os
import numpy as np
import cv2
from PIL import Image
import random
def load_vae(vae_dir):
vae_ckpt_path = vae_dir / "diffusion_pytorch_model.safetensors"
vae_config_path = vae_dir / "config.json"
with open(vae_config_path, "r") as f:
vae_config = json.load(f)
vae = CausalVideoAutoencoder.from_config(vae_config)
vae_state_dict = safetensors.torch.load_file(vae_ckpt_path)
vae.load_state_dict(vae_state_dict)
return vae.cuda().to(torch.bfloat16)
def load_unet(unet_dir):
unet_ckpt_path = unet_dir / "diffusion_pytorch_model.safetensors"
unet_config_path = unet_dir / "config.json"
transformer_config = Transformer3DModel.load_config(unet_config_path)
transformer = Transformer3DModel.from_config(transformer_config)
unet_state_dict = safetensors.torch.load_file(unet_ckpt_path)
transformer.load_state_dict(unet_state_dict, strict=True)
return transformer.cuda()
def load_scheduler(scheduler_dir):
scheduler_config_path = scheduler_dir / "scheduler_config.json"
scheduler_config = RectifiedFlowScheduler.load_config(scheduler_config_path)
return RectifiedFlowScheduler.from_config(scheduler_config)
def center_crop_and_resize(frame, target_height, target_width):
h, w, _ = frame.shape
aspect_ratio_target = target_width / target_height
aspect_ratio_frame = w / h
if aspect_ratio_frame > aspect_ratio_target:
new_width = int(h * aspect_ratio_target)
x_start = (w - new_width) // 2
frame_cropped = frame[:, x_start : x_start + new_width]
else:
new_height = int(w / aspect_ratio_target)
y_start = (h - new_height) // 2
frame_cropped = frame[y_start : y_start + new_height, :]
frame_resized = cv2.resize(frame_cropped, (target_width, target_height))
return frame_resized
def load_video_to_tensor_with_resize(video_path, target_height=512, target_width=768):
cap = cv2.VideoCapture(video_path)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_resized = center_crop_and_resize(frame_rgb, target_height, target_width)
frames.append(frame_resized)
cap.release()
video_np = (np.array(frames) / 127.5) - 1.0
video_tensor = torch.tensor(video_np).permute(3, 0, 1, 2).float()
return video_tensor
def load_image_to_tensor_with_resize(image_path, target_height=512, target_width=768):
image = Image.open(image_path).convert("RGB")
image_np = np.array(image)
frame_resized = center_crop_and_resize(image_np, target_height, target_width)
frame_tensor = torch.tensor(frame_resized).permute(2, 0, 1).float()
frame_tensor = (frame_tensor / 127.5) - 1.0
# Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width)
return frame_tensor.unsqueeze(0).unsqueeze(2)
def main():
parser = argparse.ArgumentParser(
description="Load models from separate directories and run the pipeline."
)
# Directories
parser.add_argument(
"--ckpt_dir",
type=str,
required=True,
help="Path to the directory containing unet, vae, and scheduler subdirectories",
)
parser.add_argument(
"--video_path", type=str, help="Path to the input video file (first frame used)"
)
parser.add_argument("--image_path", type=str, help="Path to the input image file")
parser.add_argument("--seed", type=int, default="171198")
# Pipeline parameters
parser.add_argument(
"--num_inference_steps", type=int, default=40, help="Number of inference steps"
)
parser.add_argument(
"--num_images_per_prompt",
type=int,
default=1,
help="Number of images per prompt",
)
parser.add_argument(
"--guidance_scale",
type=float,
default=3,
help="Guidance scale for the pipeline",
)
parser.add_argument(
"--height", type=int, default=512, help="Height of the output video frames"
)
parser.add_argument(
"--width", type=int, default=768, help="Width of the output video frames"
)
parser.add_argument(
"--num_frames",
type=int,
default=121,
help="Number of frames to generate in the output video",
)
parser.add_argument(
"--frame_rate", type=int, default=25, help="Frame rate for the output video"
)
# Prompts
parser.add_argument(
"--prompt",
type=str,
default='A man wearing a black leather jacket and blue jeans is riding a Harley Davidson motorcycle down a paved road. The man has short brown hair and is wearing a black helmet. The motorcycle is a dark red color with a large front fairing. The road is surrounded by green grass and trees. There is a gas station on the left side of the road with a red and white sign that says "Oil" and "Diner".',
help="Text prompt to guide generation",
)
parser.add_argument(
"--negative_prompt",
type=str,
default="worst quality, inconsistent motion, blurry, jittery, distorted",
help="Negative prompt for undesired features",
)
args = parser.parse_args()
# Paths for the separate mode directories
ckpt_dir = Path(args.ckpt_dir)
unet_dir = ckpt_dir / "unet"
vae_dir = ckpt_dir / "vae"
scheduler_dir = ckpt_dir / "scheduler"
# Load models
vae = load_vae(vae_dir)
unet = load_unet(unet_dir)
scheduler = load_scheduler(scheduler_dir)
patchifier = SymmetricPatchifier(patch_size=1)
text_encoder = T5EncoderModel.from_pretrained(
"PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="text_encoder"
).to("cuda")
tokenizer = T5Tokenizer.from_pretrained(
"PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="tokenizer"
)
# Use submodels for the pipeline
submodel_dict = {
"transformer": unet,
"patchifier": patchifier,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
"vae": vae,
}
pipeline = VideoPixArtAlphaPipeline(**submodel_dict).to("cuda")
# Load media (video or image)
if args.video_path:
media_items = load_video_to_tensor_with_resize(
args.video_path, args.height, args.width
).unsqueeze(0)
elif args.image_path:
media_items = load_image_to_tensor_with_resize(
args.image_path, args.height, args.width
)
else:
raise ValueError("Either --video_path or --image_path must be provided.")
# Prepare input for the pipeline
sample = {
"prompt": args.prompt,
"prompt_attention_mask": None,
"negative_prompt": args.negative_prompt,
"negative_prompt_attention_mask": None,
"media_items": media_items,
}
random.seed(args.seed)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
torch.cuda.manual_seed(args.seed)
generator = torch.Generator(device="cuda").manual_seed(args.seed)
images = pipeline(
num_inference_steps=args.num_inference_steps,
num_images_per_prompt=args.num_images_per_prompt,
guidance_scale=args.guidance_scale,
generator=generator,
output_type="pt",
callback_on_step_end=None,
height=args.height,
width=args.width,
num_frames=args.num_frames,
frame_rate=args.frame_rate,
**sample,
is_video=True,
vae_per_channel_normalize=True,
conditioning_method=ConditioningMethod.FIRST_FRAME,
).images
# Save output video
def get_unique_filename(base, ext, dir=".", index_range=1000):
for i in range(index_range):
filename = os.path.join(dir, f"{base}_{i}{ext}")
if not os.path.exists(filename):
return filename
raise FileExistsError(
f"Could not find a unique filename after {index_range} attempts."
)
for i in range(images.shape[0]):
video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy()
video_np = (video_np * 255).astype(np.uint8)
fps = args.frame_rate
height, width = video_np.shape[1:3]
output_filename = get_unique_filename(f"video_output_{i}", ".mp4", ".")
out = cv2.VideoWriter(
output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)
)
for frame in video_np[..., ::-1]:
out.write(frame)
out.release()
if __name__ == "__main__":
main()