--- license: mit tags: - pytorch - stable-diffusion - text2Image - stabilityai/stable-diffusion-2-1 datasets: - xchuan/text2image-fupo language: - en base_model: - stabilityai/stable-diffusion-2-1 pipeline_tag: text-to-image --- # This LoRA is trained based on stabilityai/stable-diffusion-2-1. ## Training code ```python import torch device = torch.device("cuda" if torch.cuda.is_available() else "cpu") from datasets import load_dataset dataset = load_dataset("xchuan/text2image-fupo",split="train") from transformers import CLIPTokenizer from huggingface_hub import login # ========== LoRA 模型库 ========== from peft import LoraConfig, get_peft_model, PeftModel login(token="替换为你自己的",add_to_git_credential=True) weight_dtype = torch.bfloat16 train_batch_size = 4 snr_gamma = 5 # SNR 参数,用于信噪比加权损失的调节系数 # 设置随机数种子以确保可重复性 seed = 1126 # 随机数种子 torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) # 优化器参数 unet_learning_rate = 1e-6 # UNet 的学习率,控制 UNet 参数更新的步长 text_encoder_learning_rate = 1e-4 # 文本编码器的学习率,控制文本嵌入层的参数更新步长 # 学习率调度器参数 lr_scheduler_name = "cosine_with_restarts" # 设置学习率调度器为 Cosine annealing with restarts,逐渐减少学习率并定期重启 lr_warmup_steps = 100 # 学习率预热步数,在最初的 100 步中逐渐增加学习率到最大值 max_train_steps = 500 # 总训练步数,决定了整个训练过程的迭代次数 num_cycles = 1 # Cosine 调度器的周期数量,在训练期间会重复 3 次学习率周期性递减并重启 pretrained_model_name_or_path = "stabilityai/stable-diffusion-2-1" # LoRA 配置 unet_lora_config = LoraConfig( r=32, # LoRA 的秩,即低秩矩阵的维度,决定了参数调整的自由度 lora_alpha=16, # 缩放系数,控制 LoRA 权重对模型的影响 init_lora_weights="gaussian", target_modules=["to_k", "to_q", "to_v", "to_out.0"], lora_dropout=0 # LoRA dropout 概率,0 表示不使用 dropout ) from torchvision import transforms from torch.utils.data import DataLoader resolution = 512 train_transform = transforms.Compose([ transforms.Resize(resolution, interpolation=transforms.InterpolationMode.BILINEAR), # 调整图像大小 transforms.CenterCrop(resolution), # 中心裁剪图像 transforms.RandomHorizontalFlip(), # 随机水平翻转 transforms.ToTensor(), # 将图像转换为张量 ]) def collate_fn(examples): pixel_values = [] input_ids = [] for example in examples: image_tensor = train_transform(example["image"]) if not isinstance(image_tensor, torch.Tensor): print(f"Expected Tensor, got {type(image_tensor)} instead.") continue pixel_values.append(image_tensor) input_text = "fupo:" + example["text"] tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer") encode_text = tokenizer(input_text, return_tensors="pt",padding="max_length",truncation=True) inputs_id = encode_text["input_ids"].squeeze(0) input_ids.append(inputs_id) # 如果没有有效的图像,则返回空的字典 if len(pixel_values) == 0: return {"pixel_values": torch.empty(0), "input_ids": torch.empty(0)} pixel_values = torch.stack(pixel_values, dim=0).float() input_ids = torch.stack(input_ids, dim=0) return {"pixel_values": pixel_values, "input_ids": input_ids} train_dataloader = DataLoader(dataset, shuffle=True, collate_fn=collate_fn, batch_size=train_batch_size) from diffusers import SD3Transformer2DModel def prepare_lora_model(unet_lora_config, pretrained_model_name_or_path, model_path=None, resume=False, merge_lora=False): """ (1) 目标: - 加载完整的 Stable Diffusion 模型,包括 LoRA 层,并根据需要合并 LoRA 权重。这包括 Tokenizer、噪声调度器、UNet、VAE 和文本编码器。 (2) 参数: - unet_lora_config: LoraConfig, LoRA 的配置对象 - pretrained_model_name_or_path: str, Hugging Face 上的模型名称或路径 - model_path: str, 预训练模型的路径 - resume: bool, 是否从上一次训练中恢复 - merge_lora: bool, 是否在推理时合并 LoRA 权重 (3) 返回: - tokenizer: CLIPTokenizer - noise_scheduler: DDPMScheduler - unet: UNet2DConditionModel - vae: AutoencoderKL - text_encoder: CLIPTextModel """ # 加载噪声调度器,用于控制扩散模型的噪声添加和移除过程 noise_scheduler = DDIMScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler") # 加载 Tokenizer,用于将文本标注转换为 tokens tokenizer = CLIPTokenizer.from_pretrained( pretrained_model_name_or_path, subfolder="tokenizer" ) # 加载 CLIP 文本编码器,用于将文本标注转换为特征向量 text_encoder = CLIPTextModel.from_pretrained( pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="text_encoder" ) # 加载 VAE 模型,用于在扩散模型中处理图像的潜在表示 vae = AutoencoderKL.from_pretrained( pretrained_model_name_or_path, subfolder="vae" ) # 加载 UNet 模型,负责处理扩散模型中的图像生成和推理过程 unet = UNet2DConditionModel.from_pretrained( pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="unet" ) # 冻结 VAE 参数 vae.requires_grad_(False) text_encoder.requires_grad_(False) unet.requires_grad_(False) # 如果设置为继续训练,则加载上一次的模型权重 if resume: if model_path is None or not os.path.exists(model_path): raise ValueError("当 resume 设置为 True 时,必须提供有效的 model_path") # 使用 PEFT 的 from_pretrained 方法加载 LoRA 模型 # text_encoder = PeftModel.from_pretrained(text_encoder, os.path.join(model_path, "text_encoder")) unet = PeftModel.from_pretrained(unet, os.path.join(model_path, "unet")) # 确保 LoRA 参数是可训练的,仅将指定的模块参数设为可训练 target_modules = ["to_k", "to_q", "to_v", "to_out.0"] for name, param in unet.named_parameters(): # 只对指定的目标模块设置 requires_grad 为 True if any(target_module in name for target_module in target_modules): param.requires_grad = True # 仅将 LoRA 参数设为可训练 print(f"✅ 已从 {model_path} 恢复模型权重") else: # 将 LoRA 配置应用到unet unet.add_adapter(unet_lora_config) # 打印可训练参数数量 print("📊 UNet 可训练参数:") trainable_params = 0 for name, param in unet.named_parameters(): if param.requires_grad: param_count = param.numel() # 计算该参数张量的元素数量 trainable_params += param_count # print(f"可训练参数: {name}, 形状: {param.shape}, 参数数量: {param_count}") print(f"总的 LoRA 可训练参数数量: {trainable_params}") if merge_lora: # 合并 LoRA 权重到基础模型,仅在推理时调用 # text_encoder = text_encoder.merge_and_unload() unet = unet.merge_and_unload() # 切换为评估模式 text_encoder.eval() unet.eval() # 将模型移动到 GPU 上并设置权重的数据类型 unet.to(device, dtype=weight_dtype) vae.to(device, dtype=weight_dtype) text_encoder.to(device, dtype=weight_dtype) return tokenizer, noise_scheduler, unet, vae, text_encoder def prepare_optimizer(unet, text_encoder, unet_learning_rate=5e-4, text_encoder_learning_rate=1e-4): # 筛选出 UNet 中需要训练的 Lora 层参数 unet_lora_layers = [p for p in unet.parameters() if p.requires_grad] # 将需要训练的参数分组并设置不同的学习率 trainable_params = [ {"params": unet_lora_layers, "lr": unet_learning_rate}, ] # 使用 AdamW 优化器 optimizer = torch.optim.AdamW(trainable_params) return optimizer import os from diffusers.optimization import get_scheduler from diffusers.training_utils import compute_snr from diffusers import DDPMScheduler,AutoencoderKL,UNet2DConditionModel from transformers import CLIPTextModel project_name = "fupo" dataset_name = "fupo" # 根目录和主要目录 root_dir = "./" # 当前目录 main_dir = os.path.join(root_dir, "SD-2-1") # 主目录 # 项目目录 project_dir = os.path.join(main_dir, project_name) model_path = os.path.join(project_dir, "logs", "checkpoint-last") # 项目目录 project_dir = os.path.join(main_dir, project_name) model_path = os.path.join(project_dir, "logs", "checkpoint-last") # 准备模型 tokenizer, noise_scheduler, unet, vae, text_encoder = prepare_lora_model( unet_lora_config, pretrained_model_name_or_path, model_path, resume=False, merge_lora=False ) # 准备优化器 optimizer = prepare_optimizer( unet, text_encoder, unet_learning_rate=unet_learning_rate, text_encoder_learning_rate=text_encoder_learning_rate ) # 设置学习率调度器 lr_scheduler = get_scheduler( lr_scheduler_name, optimizer=optimizer, num_warmup_steps=lr_warmup_steps, num_training_steps=max_train_steps, num_cycles=num_cycles ) print("✅ 模型和优化器准备完成!可以开始训练。") import math from huggingface_hub import HfApi, Repository from tqdm.auto import tqdm import torch.nn.functional as F from peft.utils import get_peft_model_state_dict from diffusers.utils import convert_state_dict_to_diffusers accumulation_steps = 4 # 梯度累积步数 max_norm = 0.5 output_folder = os.path.join(project_dir, "logs") # 禁用并行化,避免警告 os.environ["TOKENIZERS_PARALLELISM"] = "false" # 初始化 global_step = 0 best_loss = float("inf") # 初始化为正无穷大,存储最佳损失值 # 进度条显示训练进度 progress_bar = tqdm( range(max_train_steps), # 根据 num_training_steps 设置 desc="训练步骤", ) # 训练循环 for epoch in range(math.ceil(max_train_steps / len(train_dataloader))): # 如果你想在训练中增加评估,那在循环中增加 train() 是有必要的 unet.train() for step, batch in enumerate(train_dataloader): if global_step >= max_train_steps: break # 编码图像为潜在表示(latent) latents = vae.encode(batch["pixel_values"].to(device, dtype=weight_dtype)).latent_dist.sample() latents = latents * vae.config.scaling_factor # 根据 VAE 的缩放因子调整潜在空间 # 为潜在表示添加噪声,生成带噪声的图像 noise = torch.randn_like(latents) # 生成与潜在表示相同形状的随机噪声 timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (latents.shape[0],), device=device).long() noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps) # 获取文本的嵌入表示 encoder_hidden_states = text_encoder(batch["input_ids"].to(device),return_dict=False)[0] assert encoder_hidden_states is not None, "Encoder hidden states should not be None" # 计算目标值 if noise_scheduler.config.prediction_type == "epsilon": target = noise # 预测噪声 elif noise_scheduler.config.prediction_type == "v_prediction": target = noise_scheduler.get_velocity(latents, noise, timesteps) # 预测速度向量 # UNet 模型预测 with torch.autograd.detect_anomaly(): model_pred = unet(noisy_latents, timesteps, encoder_hidden_states, return_dict=False)[0] assert model_pred is not None, "Model prediction should not be None" # 计算损失 if not snr_gamma: loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean") else: # 计算信噪比 (SNR) 并根据 SNR 加权 MSE 损失 snr = compute_snr(noise_scheduler, timesteps) mse_loss_weights = torch.stack([snr, snr_gamma * torch.ones_like(timesteps)], dim=1).min(dim=1)[0] if noise_scheduler.config.prediction_type == "epsilon": mse_loss_weights = mse_loss_weights / snr elif noise_scheduler.config.prediction_type == "v_prediction": mse_loss_weights = mse_loss_weights / (snr + 1) # 计算加权的 MSE 损失 loss = F.mse_loss(model_pred.float(), target.float(), reduction="none") loss = loss.mean(dim=list(range(1, len(loss.shape)))) * mse_loss_weights loss = loss.mean() # 反向传播 loss.backward() torch.nn.utils.clip_grad_norm_(unet.parameters(), max_norm) # 梯度累积 if (global_step + 1) % accumulation_steps == 0: optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1) global_step += 1 if global_step %100 == 0: # 保存当前损失最低的模型 if loss.item() < best_loss: best_loss = loss.item() save_path = os.path.join(output_folder, "best_checkpoint") os.makedirs(save_path, exist_ok=True) # 使用 save_pretrained 保存 PeftModel unet_lora_state_dict = convert_state_dict_to_diffusers(get_peft_model_state_dict(unet)) StableDiffusionPipeline.save_lora_weights( save_directory=save_path, unet_lora_layers=unet_lora_state_dict, safe_serialization=True, ) # text_encoder.save_pretrained(os.path.join(save_path, "text_encoder")) print(f"💾 损失最小模型已保存到 {save_path}, 当前损失: {best_loss}") # 保存最终模型到 checkpoint-last save_path = os.path.join(output_folder, "checkpoint-last") os.makedirs(save_path, exist_ok=True) unet_lora_state_dict = convert_state_dict_to_diffusers(get_peft_model_state_dict(unet)) StableDiffusionPipeline.save_lora_weights( save_directory=save_path, unet_lora_layers=unet_lora_state_dict, safe_serialization=True, ) print(f"💾 已保存最终模型到 {save_path}") ```