|
import torch |
|
from xora.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder |
|
from xora.models.transformers.transformer3d import Transformer3DModel |
|
from xora.models.transformers.symmetric_patchifier import SymmetricPatchifier |
|
from xora.schedulers.rf import RectifiedFlowScheduler |
|
from xora.pipelines.pipeline_video_pixart_alpha import VideoPixArtAlphaPipeline |
|
from pathlib import Path |
|
from transformers import T5EncoderModel, T5Tokenizer |
|
import safetensors.torch |
|
import json |
|
import argparse |
|
from xora.utils.conditioning_method import ConditioningMethod |
|
import os |
|
import numpy as np |
|
import cv2 |
|
from PIL import Image |
|
import random |
|
|
|
|
|
def load_vae(vae_dir): |
|
vae_ckpt_path = vae_dir / "diffusion_pytorch_model.safetensors" |
|
vae_config_path = vae_dir / "config.json" |
|
with open(vae_config_path, "r") as f: |
|
vae_config = json.load(f) |
|
vae = CausalVideoAutoencoder.from_config(vae_config) |
|
vae_state_dict = safetensors.torch.load_file(vae_ckpt_path) |
|
vae.load_state_dict(vae_state_dict) |
|
return vae.cuda().to(torch.bfloat16) |
|
|
|
|
|
def load_unet(unet_dir): |
|
unet_ckpt_path = unet_dir / "diffusion_pytorch_model.safetensors" |
|
unet_config_path = unet_dir / "config.json" |
|
transformer_config = Transformer3DModel.load_config(unet_config_path) |
|
transformer = Transformer3DModel.from_config(transformer_config) |
|
unet_state_dict = safetensors.torch.load_file(unet_ckpt_path) |
|
transformer.load_state_dict(unet_state_dict, strict=True) |
|
return transformer.cuda() |
|
|
|
|
|
def load_scheduler(scheduler_dir): |
|
scheduler_config_path = scheduler_dir / "scheduler_config.json" |
|
scheduler_config = RectifiedFlowScheduler.load_config(scheduler_config_path) |
|
return RectifiedFlowScheduler.from_config(scheduler_config) |
|
|
|
|
|
def center_crop_and_resize(frame, target_height, target_width): |
|
h, w, _ = frame.shape |
|
aspect_ratio_target = target_width / target_height |
|
aspect_ratio_frame = w / h |
|
if aspect_ratio_frame > aspect_ratio_target: |
|
new_width = int(h * aspect_ratio_target) |
|
x_start = (w - new_width) // 2 |
|
frame_cropped = frame[:, x_start : x_start + new_width] |
|
else: |
|
new_height = int(w / aspect_ratio_target) |
|
y_start = (h - new_height) // 2 |
|
frame_cropped = frame[y_start : y_start + new_height, :] |
|
frame_resized = cv2.resize(frame_cropped, (target_width, target_height)) |
|
return frame_resized |
|
|
|
|
|
def load_video_to_tensor_with_resize(video_path, target_height=512, target_width=768): |
|
cap = cv2.VideoCapture(video_path) |
|
frames = [] |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
frame_resized = center_crop_and_resize(frame_rgb, target_height, target_width) |
|
frames.append(frame_resized) |
|
cap.release() |
|
video_np = (np.array(frames) / 127.5) - 1.0 |
|
video_tensor = torch.tensor(video_np).permute(3, 0, 1, 2).float() |
|
return video_tensor |
|
|
|
|
|
def load_image_to_tensor_with_resize(image_path, target_height=512, target_width=768): |
|
image = Image.open(image_path).convert("RGB") |
|
image_np = np.array(image) |
|
frame_resized = center_crop_and_resize(image_np, target_height, target_width) |
|
frame_tensor = torch.tensor(frame_resized).permute(2, 0, 1).float() |
|
frame_tensor = (frame_tensor / 127.5) - 1.0 |
|
|
|
return frame_tensor.unsqueeze(0).unsqueeze(2) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Load models from separate directories and run the pipeline." |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--ckpt_dir", |
|
type=str, |
|
required=True, |
|
help="Path to the directory containing unet, vae, and scheduler subdirectories", |
|
) |
|
parser.add_argument( |
|
"--video_path", type=str, help="Path to the input video file (first frame used)" |
|
) |
|
parser.add_argument("--image_path", type=str, help="Path to the input image file") |
|
parser.add_argument("--seed", type=int, default="171198") |
|
|
|
|
|
parser.add_argument( |
|
"--num_inference_steps", type=int, default=40, help="Number of inference steps" |
|
) |
|
parser.add_argument( |
|
"--num_images_per_prompt", |
|
type=int, |
|
default=1, |
|
help="Number of images per prompt", |
|
) |
|
parser.add_argument( |
|
"--guidance_scale", |
|
type=float, |
|
default=3, |
|
help="Guidance scale for the pipeline", |
|
) |
|
parser.add_argument( |
|
"--height", type=int, default=512, help="Height of the output video frames" |
|
) |
|
parser.add_argument( |
|
"--width", type=int, default=768, help="Width of the output video frames" |
|
) |
|
parser.add_argument( |
|
"--num_frames", |
|
type=int, |
|
default=121, |
|
help="Number of frames to generate in the output video", |
|
) |
|
parser.add_argument( |
|
"--frame_rate", type=int, default=25, help="Frame rate for the output video" |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--prompt", |
|
type=str, |
|
default='A man wearing a black leather jacket and blue jeans is riding a Harley Davidson motorcycle down a paved road. The man has short brown hair and is wearing a black helmet. The motorcycle is a dark red color with a large front fairing. The road is surrounded by green grass and trees. There is a gas station on the left side of the road with a red and white sign that says "Oil" and "Diner".', |
|
help="Text prompt to guide generation", |
|
) |
|
parser.add_argument( |
|
"--negative_prompt", |
|
type=str, |
|
default="worst quality, inconsistent motion, blurry, jittery, distorted", |
|
help="Negative prompt for undesired features", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
ckpt_dir = Path(args.ckpt_dir) |
|
unet_dir = ckpt_dir / "unet" |
|
vae_dir = ckpt_dir / "vae" |
|
scheduler_dir = ckpt_dir / "scheduler" |
|
|
|
|
|
vae = load_vae(vae_dir) |
|
unet = load_unet(unet_dir) |
|
scheduler = load_scheduler(scheduler_dir) |
|
patchifier = SymmetricPatchifier(patch_size=1) |
|
text_encoder = T5EncoderModel.from_pretrained( |
|
"PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="text_encoder" |
|
).to("cuda") |
|
tokenizer = T5Tokenizer.from_pretrained( |
|
"PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="tokenizer" |
|
) |
|
|
|
|
|
submodel_dict = { |
|
"transformer": unet, |
|
"patchifier": patchifier, |
|
"text_encoder": text_encoder, |
|
"tokenizer": tokenizer, |
|
"scheduler": scheduler, |
|
"vae": vae, |
|
} |
|
|
|
pipeline = VideoPixArtAlphaPipeline(**submodel_dict).to("cuda") |
|
|
|
|
|
if args.video_path: |
|
media_items = load_video_to_tensor_with_resize( |
|
args.video_path, args.height, args.width |
|
).unsqueeze(0) |
|
elif args.image_path: |
|
media_items = load_image_to_tensor_with_resize( |
|
args.image_path, args.height, args.width |
|
) |
|
else: |
|
raise ValueError("Either --video_path or --image_path must be provided.") |
|
|
|
|
|
sample = { |
|
"prompt": args.prompt, |
|
"prompt_attention_mask": None, |
|
"negative_prompt": args.negative_prompt, |
|
"negative_prompt_attention_mask": None, |
|
"media_items": media_items, |
|
} |
|
|
|
random.seed(args.seed) |
|
np.random.seed(args.seed) |
|
torch.manual_seed(args.seed) |
|
torch.cuda.manual_seed(args.seed) |
|
generator = torch.Generator(device="cuda").manual_seed(args.seed) |
|
|
|
images = pipeline( |
|
num_inference_steps=args.num_inference_steps, |
|
num_images_per_prompt=args.num_images_per_prompt, |
|
guidance_scale=args.guidance_scale, |
|
generator=generator, |
|
output_type="pt", |
|
callback_on_step_end=None, |
|
height=args.height, |
|
width=args.width, |
|
num_frames=args.num_frames, |
|
frame_rate=args.frame_rate, |
|
**sample, |
|
is_video=True, |
|
vae_per_channel_normalize=True, |
|
conditioning_method=ConditioningMethod.FIRST_FRAME, |
|
).images |
|
|
|
|
|
def get_unique_filename(base, ext, dir=".", index_range=1000): |
|
for i in range(index_range): |
|
filename = os.path.join(dir, f"{base}_{i}{ext}") |
|
if not os.path.exists(filename): |
|
return filename |
|
raise FileExistsError( |
|
f"Could not find a unique filename after {index_range} attempts." |
|
) |
|
|
|
for i in range(images.shape[0]): |
|
video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() |
|
video_np = (video_np * 255).astype(np.uint8) |
|
fps = args.frame_rate |
|
height, width = video_np.shape[1:3] |
|
output_filename = get_unique_filename(f"video_output_{i}", ".mp4", ".") |
|
|
|
out = cv2.VideoWriter( |
|
output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height) |
|
) |
|
|
|
for frame in video_np[..., ::-1]: |
|
out.write(frame) |
|
|
|
out.release() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|