{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "ca72c15d-7d1b-45af-b6d5-9abec82bd5bb", "metadata": {}, "source": [ "# InstantID: Zero-shot Identity-Preserving Generation using OpenVINO\n", "\n", "Nowadays has been significant progress in personalized image synthesis with methods such as Textual Inversion, DreamBooth, and LoRA.\n", "However, their real-world applicability is hindered by high storage demands, lengthy fine-tuning processes, and the need for multiple reference images. Conversely, existing ID embedding-based methods, while requiring only a single forward inference, face challenges: they either necessitate extensive fine-tuning across numerous model parameters, lack compatibility with community pre-trained models, or fail to maintain high face fidelity. \n", "\n", "[InstantID](https://instantid.github.io/) is a tuning-free method to achieve ID-Preserving generation with only single image, supporting various downstream tasks.\n", "![applications.png](https://github.com/InstantID/InstantID/blob/main/assets/applications.png?raw=true)\n", "\n", "Given only one reference ID image, InstantID aims to generate customized images with various poses or styles from a single reference ID image while ensuring high fidelity. Following figure provides an overview of the method. It incorporates three crucial components: \n", "\n", "1. An ID embedding that captures robust semantic face information; \n", "2. A lightweight adapted module with decoupled cross-attention, facilitating the use of an image as a visual prompt;\n", "3. An IdentityNet that encodes the detailed features from the reference facial image with additional spatial control.\n", "\n", "![instantid-components.png](https://instantid.github.io/static/documents/pipeline.png)\n", "\n", "The difference InstantID from previous works in the following aspects: \n", "1. do not involve UNet training, so it can preserve the generation ability of the original text-to-image model and be compatible with existing pre-trained models and ControlNets in the community;\n", "2. doesn't require test-time tuning, so for a specific character, there is no need to collect multiple images for fine-tuning, only a single image needs to be inferred once;\n", "3. achieve better face fidelity, and retain the editability of text.\n", "\n", "You can find more details about the approach with [project web page](https://instantid.github.io/), [paper](https://arxiv.org/abs/2401.07519) and [original repository](https://github.com/InstantID/InstantID)\n", "\n", "In this tutorial, we consider how to use InstantID with OpenVINO. An additional part demonstrates how to run optimization with [NNCF](https://github.com/openvinotoolkit/nncf/) to speed up pipeline.\n", "#### Table of contents:\n", "- [Prerequisites](#Prerequisites)\n", "- [Convert and prepare Face IdentityNet](#Convert-and-prepare-Face-IdentityNet)\n", " - [Select Inference Device for Face Recognition](#Select-Inference-Device-for-Face-Recognition)\n", " - [Perform Face Identity extraction](#Perform-Face-Identity-extraction)\n", "- [Prepare InstantID pipeline](#Prepare-InstantID-pipeline)\n", "- [Convert InstantID pipeline components to OpenVINO Intermediate Representation format](#Convert-InstantID-pipeline-components-to-OpenVINO-Intermediate-Representation-format)\n", " - [ControlNet](#ControlNet)\n", " - [Unet](#Unet)\n", " - [VAE Decoder](#VAE-Decoder)\n", " - [Text Encoders](#Text-Encoders)\n", " - [Image Projection Model](#Image-Projection-Model)\n", "- [Prepare OpenVINO InstantID Pipeline](#Prepare-OpenVINO-InstantID-Pipeline)\n", "- [Run OpenVINO pipeline inference](#Run-OpenVINO-pipeline-inference)\n", " - [Select inference device for InstantID](#Select-inference-device-for-InstantID)\n", " - [Create pipeline](#Create-pipeline)\n", " - [Run inference](#Run-inference)\n", "- [Quantization](#Quantization)\n", " - [Prepare calibration datasets](#Prepare-calibration-datasets)\n", " - [Run quantization](#Run-quantization)\n", " - [Run ControlNet Quantization](#Run-ControlNet-Quantization)\n", " - [Run UNet Hybrid Quantization](#Run-UNet-Hybrid-Quantization)\n", " - [Run Weights Compression](#Run-Weights-Compression)\n", " - [Compare model file sizes](#Compare-model-file-sizes)\n", " - [Compare inference time of the FP16 and INT8 pipelines](#Compare-inference-time-of-the-FP16-and-INT8-pipelines)\n", "- [Interactive demo](#Interactive-demo)\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a42ecb80-d4e9-4056-a648-fe42090210fd", "metadata": {}, "source": [ "## Prerequisites\n", "[back to top ⬆️](#Table-of-contents:)" ] }, { "cell_type": "code", "execution_count": 1, "id": "70df6ac1-94f7-4de3-bf55-49743b79e796", "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "import sys\n", "\n", "repo_dir = Path(\"InstantID\")\n", "\n", "if not repo_dir.exists():\n", " !git clone https://github.com/InstantID/InstantID.git\n", "\n", "sys.path.append(str(repo_dir))" ] }, { "cell_type": "code", "execution_count": null, "id": "6fa0bd47-175f-4433-914b-1d0c241a6743", "metadata": {}, "outputs": [], "source": [ "%pip install -q \"openvino>=2023.3.0\" opencv-python transformers diffusers accelerate gdown \"scikit-image>=0.19.2\" \"gradio>=4.19\" \"nncf>=2.9.0\" \"datasets>=2.14.6\" \"peft==0.6.2\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "476ffd57-00cf-4f29-b3a0-8c087807a031", "metadata": {}, "source": [ "## Convert and prepare Face IdentityNet\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "For getting face embeddings and pose key points, InstantID uses [InsightFace](https://github.com/deepinsight/insightface) face analytic library. Its models are distributed in ONNX format and can be run with OpenVINO.\n", "For preparing the face image, we need to detect the bounding boxes and keypoints for the face using the RetinaFace model, crop the detected face, align the face location using landmarks, and provide each face into the Arcface face embedding model for getting the person's identity embeddings.\n", "\n", "The code below downloads the InsightFace Antelopev2 model kit and provides a simple interface compatible with InsightFace for getting face recognition results." ] }, { "cell_type": "code", "execution_count": 3, "id": "1fc7104a-52c6-4223-a291-a4546bd0d474", "metadata": {}, "outputs": [], "source": [ "MODELS_DIR = Path(\"models\")\n", "face_detector_path = MODELS_DIR / \"antelopev2\" / \"scrfd_10g_bnkps.onnx\"\n", "face_embeddings_path = MODELS_DIR / \"antelopev2\" / \"glintr100.onnx\"" ] }, { "cell_type": "code", "execution_count": 4, "id": "a46856da-b88c-473e-9745-d1bdc83c8c9d", "metadata": {}, "outputs": [], "source": [ "from zipfile import ZipFile\n", "import gdown\n", "\n", "archive_file = Path(\"antelopev2.zip\")\n", "\n", "if not face_detector_path.exists() or face_embeddings_path.exists():\n", " if not archive_file.exists():\n", " gdown.download(\n", " \"https://drive.google.com/uc?id=18wEUfMNohBJ4K3Ly5wpTejPfDzp-8fI8\",\n", " str(archive_file),\n", " )\n", " with ZipFile(archive_file, \"r\") as zip_face_models:\n", " zip_face_models.extractall(MODELS_DIR)" ] }, { "cell_type": "code", "execution_count": 5, "id": "d6e1580d-2c29-40fb-a086-0f53287f6a1f", "metadata": {}, "outputs": [], "source": [ "import cv2\n", "import numpy as np\n", "from skimage import transform as trans\n", "\n", "\n", "def softmax(z):\n", " assert len(z.shape) == 2\n", " s = np.max(z, axis=1)\n", " s = s[:, np.newaxis] # necessary step to do broadcasting\n", " e_x = np.exp(z - s)\n", " div = np.sum(e_x, axis=1)\n", " div = div[:, np.newaxis] # dito\n", " return e_x / div\n", "\n", "\n", "def distance2bbox(points, distance, max_shape=None):\n", " \"\"\"Decode distance prediction to bounding box.\n", "\n", " Args:\n", " points (Tensor): Shape (n, 2), [x, y].\n", " distance (Tensor): Distance from the given point to 4\n", " boundaries (left, top, right, bottom).\n", " max_shape (tuple): Shape of the image.\n", "\n", " Returns:\n", " Tensor: Decoded bboxes.\n", " \"\"\"\n", " x1 = points[:, 0] - distance[:, 0]\n", " y1 = points[:, 1] - distance[:, 1]\n", " x2 = points[:, 0] + distance[:, 2]\n", " y2 = points[:, 1] + distance[:, 3]\n", " if max_shape is not None:\n", " x1 = x1.clamp(min=0, max=max_shape[1])\n", " y1 = y1.clamp(min=0, max=max_shape[0])\n", " x2 = x2.clamp(min=0, max=max_shape[1])\n", " y2 = y2.clamp(min=0, max=max_shape[0])\n", " return np.stack([x1, y1, x2, y2], axis=-1)\n", "\n", "\n", "def distance2kps(points, distance, max_shape=None):\n", " \"\"\"Decode distance prediction to bounding box.\n", "\n", " Args:\n", " points (Tensor): Shape (n, 2), [x, y].\n", " distance (Tensor): Distance from the given point to 4\n", " boundaries (left, top, right, bottom).\n", " max_shape (tuple): Shape of the image.\n", "\n", " Returns:\n", " Tensor: Decoded bboxes.\n", " \"\"\"\n", " preds = []\n", " for i in range(0, distance.shape[1], 2):\n", " px = points[:, i % 2] + distance[:, i]\n", " py = points[:, i % 2 + 1] + distance[:, i + 1]\n", " if max_shape is not None:\n", " px = px.clamp(min=0, max=max_shape[1])\n", " py = py.clamp(min=0, max=max_shape[0])\n", " preds.append(px)\n", " preds.append(py)\n", " return np.stack(preds, axis=-1)\n", "\n", "\n", "def prepare_input(image, std, mean, reverse_channels=True):\n", " normalized_image = (image.astype(np.float32) - mean) / std\n", " if reverse_channels:\n", " normalized_image = normalized_image[:, :, ::-1]\n", " input_tensor = np.expand_dims(np.transpose(normalized_image, (2, 0, 1)), 0)\n", " return input_tensor\n", "\n", "\n", "class RetinaFace:\n", " def __init__(self, ov_model):\n", " self.taskname = \"detection\"\n", " self.ov_model = ov_model\n", " self.center_cache = {}\n", " self.nms_thresh = 0.4\n", " self.det_thresh = 0.5\n", " self._init_vars()\n", "\n", " def _init_vars(self):\n", " self.input_size = (640, 640)\n", " outputs = self.ov_model.outputs\n", " self.input_mean = 127.5\n", " self.input_std = 128.0\n", " # print(self.output_names)\n", " # assert len(outputs)==10 or len(outputs)==15\n", " self.use_kps = False\n", " self._anchor_ratio = 1.0\n", " self._num_anchors = 1\n", " if len(outputs) == 6:\n", " self.fmc = 3\n", " self._feat_stride_fpn = [8, 16, 32]\n", " self._num_anchors = 2\n", " elif len(outputs) == 9:\n", " self.fmc = 3\n", " self._feat_stride_fpn = [8, 16, 32]\n", " self._num_anchors = 2\n", " self.use_kps = True\n", " elif len(outputs) == 10:\n", " self.fmc = 5\n", " self._feat_stride_fpn = [8, 16, 32, 64, 128]\n", " self._num_anchors = 1\n", " elif len(outputs) == 15:\n", " self.fmc = 5\n", " self._feat_stride_fpn = [8, 16, 32, 64, 128]\n", " self._num_anchors = 1\n", " self.use_kps = True\n", "\n", " def prepare(self, **kwargs):\n", " nms_thresh = kwargs.get(\"nms_thresh\", None)\n", " if nms_thresh is not None:\n", " self.nms_thresh = nms_thresh\n", " det_thresh = kwargs.get(\"det_thresh\", None)\n", " if det_thresh is not None:\n", " self.det_thresh = det_thresh\n", " input_size = kwargs.get(\"input_size\", None)\n", " if input_size is not None:\n", " if self.input_size is not None:\n", " print(\"warning: det_size is already set in detection model, ignore\")\n", " else:\n", " self.input_size = input_size\n", "\n", " def forward(self, img, threshold):\n", " scores_list = []\n", " bboxes_list = []\n", " kpss_list = []\n", " blob = prepare_input(img, self.input_mean, self.input_std, True)\n", " net_outs = self.ov_model(blob)\n", "\n", " input_height = blob.shape[2]\n", " input_width = blob.shape[3]\n", " fmc = self.fmc\n", " for idx, stride in enumerate(self._feat_stride_fpn):\n", " scores = net_outs[idx]\n", " bbox_preds = net_outs[idx + fmc]\n", " bbox_preds = bbox_preds * stride\n", " if self.use_kps:\n", " kps_preds = net_outs[idx + fmc * 2] * stride\n", " height = input_height // stride\n", " width = input_width // stride\n", " key = (height, width, stride)\n", " if key in self.center_cache:\n", " anchor_centers = self.center_cache[key]\n", " else:\n", " anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)\n", " anchor_centers = (anchor_centers * stride).reshape((-1, 2))\n", " if self._num_anchors > 1:\n", " anchor_centers = np.stack([anchor_centers] * self._num_anchors, axis=1).reshape((-1, 2))\n", " if len(self.center_cache) < 100:\n", " self.center_cache[key] = anchor_centers\n", "\n", " pos_inds = np.where(scores >= threshold)[0]\n", " bboxes = distance2bbox(anchor_centers, bbox_preds)\n", " pos_scores = scores[pos_inds]\n", " pos_bboxes = bboxes[pos_inds]\n", " scores_list.append(pos_scores)\n", " bboxes_list.append(pos_bboxes)\n", " if self.use_kps:\n", " kpss = distance2kps(anchor_centers, kps_preds)\n", " # kpss = kps_preds\n", " kpss = kpss.reshape((kpss.shape[0], -1, 2))\n", " pos_kpss = kpss[pos_inds]\n", " kpss_list.append(pos_kpss)\n", " return scores_list, bboxes_list, kpss_list\n", "\n", " def detect(self, img, input_size=None, max_num=0, metric=\"default\"):\n", " assert input_size is not None or self.input_size is not None\n", " input_size = self.input_size if input_size is None else input_size\n", "\n", " im_ratio = float(img.shape[0]) / img.shape[1]\n", " model_ratio = float(input_size[1]) / input_size[0]\n", " if im_ratio > model_ratio:\n", " new_height = input_size[1]\n", " new_width = int(new_height / im_ratio)\n", " else:\n", " new_width = input_size[0]\n", " new_height = int(new_width * im_ratio)\n", " det_scale = float(new_height) / img.shape[0]\n", " resized_img = cv2.resize(img, (new_width, new_height))\n", " det_img = np.zeros((input_size[1], input_size[0], 3), dtype=np.uint8)\n", " det_img[:new_height, :new_width, :] = resized_img\n", "\n", " scores_list, bboxes_list, kpss_list = self.forward(det_img, self.det_thresh)\n", "\n", " scores = np.vstack(scores_list)\n", " scores_ravel = scores.ravel()\n", " order = scores_ravel.argsort()[::-1]\n", " bboxes = np.vstack(bboxes_list) / det_scale\n", " if self.use_kps:\n", " kpss = np.vstack(kpss_list) / det_scale\n", " pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False)\n", " pre_det = pre_det[order, :]\n", " keep = self.nms(pre_det)\n", " det = pre_det[keep, :]\n", " if self.use_kps:\n", " kpss = kpss[order, :, :]\n", " kpss = kpss[keep, :, :]\n", " else:\n", " kpss = None\n", " if max_num > 0 and det.shape[0] > max_num:\n", " area = (det[:, 2] - det[:, 0]) * (det[:, 3] - det[:, 1])\n", " img_center = img.shape[0] // 2, img.shape[1] // 2\n", " offsets = np.vstack(\n", " [\n", " (det[:, 0] + det[:, 2]) / 2 - img_center[1],\n", " (det[:, 1] + det[:, 3]) / 2 - img_center[0],\n", " ]\n", " )\n", " offset_dist_squared = np.sum(np.power(offsets, 2.0), 0)\n", " if metric == \"max\":\n", " values = area\n", " else:\n", " values = area - offset_dist_squared * 2.0 # some extra weight on the centering\n", " bindex = np.argsort(values)[::-1] # some extra weight on the centering\n", " bindex = bindex[0:max_num]\n", " det = det[bindex, :]\n", " if kpss is not None:\n", " kpss = kpss[bindex, :]\n", " return det, kpss\n", "\n", " def nms(self, dets):\n", " thresh = self.nms_thresh\n", " x1 = dets[:, 0]\n", " y1 = dets[:, 1]\n", " x2 = dets[:, 2]\n", " y2 = dets[:, 3]\n", " scores = dets[:, 4]\n", "\n", " areas = (x2 - x1 + 1) * (y2 - y1 + 1)\n", " order = scores.argsort()[::-1]\n", "\n", " keep = []\n", " while order.size > 0:\n", " i = order[0]\n", " keep.append(i)\n", " xx1 = np.maximum(x1[i], x1[order[1:]])\n", " yy1 = np.maximum(y1[i], y1[order[1:]])\n", " xx2 = np.minimum(x2[i], x2[order[1:]])\n", " yy2 = np.minimum(y2[i], y2[order[1:]])\n", "\n", " w = np.maximum(0.0, xx2 - xx1 + 1)\n", " h = np.maximum(0.0, yy2 - yy1 + 1)\n", " inter = w * h\n", " ovr = inter / (areas[i] + areas[order[1:]] - inter)\n", "\n", " inds = np.where(ovr <= thresh)[0]\n", " order = order[inds + 1]\n", "\n", " return keep\n", "\n", "\n", "arcface_dst = np.array(\n", " [\n", " [38.2946, 51.6963],\n", " [73.5318, 51.5014],\n", " [56.0252, 71.7366],\n", " [41.5493, 92.3655],\n", " [70.7299, 92.2041],\n", " ],\n", " dtype=np.float32,\n", ")\n", "\n", "\n", "def estimate_norm(lmk, image_size=112, mode=\"arcface\"):\n", " assert lmk.shape == (5, 2)\n", " assert image_size % 112 == 0 or image_size % 128 == 0\n", " if image_size % 112 == 0:\n", " ratio = float(image_size) / 112.0\n", " diff_x = 0\n", " else:\n", " ratio = float(image_size) / 128.0\n", " diff_x = 8.0 * ratio\n", " dst = arcface_dst * ratio\n", " dst[:, 0] += diff_x\n", " tform = trans.SimilarityTransform()\n", " tform.estimate(lmk, dst)\n", " M = tform.params[0:2, :]\n", " return M\n", "\n", "\n", "def norm_crop(img, landmark, image_size=112, mode=\"arcface\"):\n", " M = estimate_norm(landmark, image_size, mode)\n", " warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0)\n", " return warped\n", "\n", "\n", "class FaceEmbeddings:\n", " def __init__(self, ov_model):\n", " self.ov_model = ov_model\n", " self.taskname = \"recognition\"\n", " input_mean = 127.5\n", " input_std = 127.5\n", " self.input_mean = input_mean\n", " self.input_std = input_std\n", " input_shape = self.ov_model.inputs[0].partial_shape\n", " self.input_size = (input_shape[3].get_length(), input_shape[2].get_length())\n", " self.input_shape = input_shape\n", "\n", " def get(self, img, kps):\n", " aimg = norm_crop(img, landmark=kps, image_size=self.input_size[0])\n", " embedding = self.get_feat(aimg).flatten()\n", " return embedding\n", "\n", " def get_feat(self, imgs):\n", " if not isinstance(imgs, list):\n", " imgs = [imgs]\n", " input_size = self.input_size\n", " blob = np.concatenate([prepare_input(cv2.resize(img, input_size), self.input_mean, self.input_std, True) for img in imgs])\n", "\n", " net_out = self.ov_model(blob)[0]\n", " return net_out\n", "\n", " def forward(self, batch_data):\n", " blob = (batch_data - self.input_mean) / self.input_std\n", " net_out = self.ov_model(blob)[0]\n", " return net_out\n", "\n", "\n", "class OVFaceAnalysis:\n", " def __init__(self, detect_model, embedding_model):\n", " self.det_model = RetinaFace(detect_model)\n", " self.embed_model = FaceEmbeddings(embedding_model)\n", "\n", " def get(self, img, max_num=0):\n", " bboxes, kpss = self.det_model.detect(img, max_num=max_num, metric=\"default\")\n", " if bboxes.shape[0] == 0:\n", " return []\n", " ret = []\n", " for i in range(bboxes.shape[0]):\n", " bbox = bboxes[i, 0:4]\n", " det_score = bboxes[i, 4]\n", " kps = None\n", " if kpss is not None:\n", " kps = kpss[i]\n", " embedding = self.embed_model.get(img, kps)\n", " ret.append({\"bbox\": bbox, \"score\": det_score, \"kps\": kps, \"embedding\": embedding})\n", " return ret" ] }, { "attachments": {}, "cell_type": "markdown", "id": "58f01590-d63c-4312-80fb-0ad42c287d47", "metadata": {}, "source": [ "Now, let's see models inference result\n", "\n", "### Select Inference Device for Face Recognition\n", "[back to top ⬆️](#Table-of-contents:)\n", "### Select Inference Device for Face Recognition" ] }, { "cell_type": "code", "execution_count": 6, "id": "d9a6a55f-6e17-443f-a423-3b7d89ad539d", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "c636605a586a43749aeabe60c391baa7", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import openvino as ov\n", "import ipywidgets as widgets\n", "\n", "core = ov.Core()\n", "\n", "device = widgets.Dropdown(\n", " options=core.available_devices + [\"AUTO\"],\n", " value=\"AUTO\",\n", " description=\"Device:\",\n", " disabled=False,\n", ")\n", "\n", "device" ] }, { "cell_type": "code", "execution_count": 7, "id": "1407dafb-eff9-46a4-9b1c-40fc10cc47cf", "metadata": {}, "outputs": [], "source": [ "core = ov.Core()\n", "face_detector = core.compile_model(face_detector_path, device.value)\n", "face_embedding = core.compile_model(face_embeddings_path, device.value)" ] }, { "cell_type": "code", "execution_count": 8, "id": "99da0cde-cfee-4cbf-a392-62c3e6fc1774", "metadata": {}, "outputs": [], "source": [ "app = OVFaceAnalysis(face_detector, face_embedding)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "95d2b544-5f37-4590-81c1-8316b0000059", "metadata": {}, "source": [ "### Perform Face Identity extraction\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Now, we can apply our `OVFaceAnalysis` pipeline on an image for collection face embeddings and key points for reflection on the generated image" ] }, { "cell_type": "code", "execution_count": null, "id": "207fb678", "metadata": {}, "outputs": [], "source": [ "import PIL.Image\n", "from pipeline_stable_diffusion_xl_instantid import draw_kps\n", "\n", "\n", "def get_face_info(face_image: PIL.Image.Image):\n", " r\"\"\"\n", " Retrieve face information from the input face image.\n", "\n", " Args:\n", " face_image (PIL.Image.Image):\n", " An image containing a face.\n", "\n", " Returns:\n", " face_emb (numpy.ndarray):\n", " Facial embedding extracted from the face image.\n", " face_kps (PIL.Image.Image):\n", " Facial keypoints drawn on the face image.\n", " \"\"\"\n", " face_image = face_image.resize((832, 800))\n", " # prepare face emb\n", " face_info = app.get(cv2.cvtColor(np.array(face_image), cv2.COLOR_RGB2BGR))\n", " if len(face_info) == 0:\n", " raise RuntimeError(\"Couldn't find the face on the image\")\n", " face_info = sorted(\n", " face_info,\n", " key=lambda x: (x[\"bbox\"][2] - x[\"bbox\"][0]) * x[\"bbox\"][3] - x[\"bbox\"][1],\n", " )[\n", " -1\n", " ] # only use the maximum face\n", " face_emb = face_info[\"embedding\"]\n", " face_kps = draw_kps(face_image, face_info[\"kps\"])\n", " return face_emb, face_kps" ] }, { "cell_type": "code", "execution_count": 10, "id": "cb3210b1-83c3-4814-ad7e-0a997619c931", "metadata": {}, "outputs": [], "source": [ "from diffusers.utils import load_image\n", "\n", "face_image = load_image(\"https://huggingface.co/datasets/YiYiXu/testing-images/resolve/main/vermeer.jpg\")\n", "\n", "face_emb, face_kps = get_face_info(face_image)" ] }, { "cell_type": "code", "execution_count": 11, "id": "aa87f98c-681e-4668-b91f-f429381a679b", "metadata": {}, "outputs": [ { "data": { "image/jpeg": "", "image/png": "", "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "face_image" ] }, { "cell_type": "code", "execution_count": 12, "id": "ae341903-8410-4d2e-aee5-263f469f3cc2", "metadata": {}, "outputs": [ { "data": { "image/jpeg": "", "image/png": "", "text/plain": [ "" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "face_kps" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e9882719-5f14-47e6-b537-38322d4ef66e", "metadata": {}, "source": [ "## Prepare InstantID pipeline \n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "The code below downloads InstantID pipeline parts - ControlNet for face pose and IP-Adapter for adding face embeddings to prompt" ] }, { "cell_type": "code", "execution_count": 13, "id": "bd206ec7-7dfb-4927-8985-346853bd7f46", "metadata": {}, "outputs": [], "source": [ "from huggingface_hub import hf_hub_download\n", "\n", "hf_hub_download(\n", " repo_id=\"InstantX/InstantID\",\n", " filename=\"ControlNetModel/config.json\",\n", " local_dir=\"./checkpoints\",\n", ")\n", "hf_hub_download(\n", " repo_id=\"InstantX/InstantID\",\n", " filename=\"ControlNetModel/diffusion_pytorch_model.safetensors\",\n", " local_dir=\"./checkpoints\",\n", ")\n", "hf_hub_download(repo_id=\"InstantX/InstantID\", filename=\"ip-adapter.bin\", local_dir=\"./checkpoints\");" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a2e06e46-8a28-443f-a52c-c5d8f280c89d", "metadata": {}, "source": [ "As it was discussed in model description, InstantID does not required diffusion model fine-tuning and can be applied on existing Stable Diffusion pipeline. We will use [`stable-diffusion-xl-bas-1-0`](https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0) as basic text-to-image diffusion pipeline. We also apply [LCM LoRA](https://huggingface.co/latent-consistency/lcm-lora-sdxl) to speedup the generation process. Previously, we already considered how to convert and run SDXL model for Text-to-Image and Image-to-Image generation using Optimum-Intel library (please check out this notebook for [details](../stable-diffusion-xl/stable-diffusion-xl.ipynb)), now we will use it in combination with ControlNet and convert it using OpenVINO Model Conversion API. " ] }, { "cell_type": "code", "execution_count": 14, "id": "9be23b0d-b8f5-4717-800f-f2151719bf0f", "metadata": {}, "outputs": [], "source": [ "from diffusers.models import ControlNetModel\n", "from diffusers import LCMScheduler\n", "from pipeline_stable_diffusion_xl_instantid import StableDiffusionXLInstantIDPipeline\n", "\n", "import torch\n", "from PIL import Image\n", "import gc\n", "\n", "\n", "ov_controlnet_path = MODELS_DIR / \"controlnet.xml\"\n", "ov_unet_path = MODELS_DIR / \"unet.xml\"\n", "ov_vae_decoder_path = MODELS_DIR / \"vae_decoder.xml\"\n", "ov_text_encoder_path = MODELS_DIR / \"text_encoder.xml\"\n", "ov_text_encoder_2_path = MODELS_DIR / \"text_encoder_2.xml\"\n", "ov_image_proj_encoder_path = MODELS_DIR / \"image_proj_model.xml\"\n", "\n", "required_pipeline_parts = [\n", " ov_controlnet_path,\n", " ov_unet_path,\n", " ov_vae_decoder_path,\n", " ov_text_encoder_path,\n", " ov_text_encoder_2_path,\n", " ov_image_proj_encoder_path,\n", "]\n", "\n", "\n", "def load_pytorch_pipeline(sdxl_id=\"stabilityai/stable-diffusion-xl-base-1.0\"):\n", " # prepare models under ./checkpoints\n", " face_adapter = Path(\"checkpoints/ip-adapter.bin\")\n", " controlnet_path = Path(\"checkpoints/ControlNetModel\")\n", "\n", " # load IdentityNet\n", " controlnet = ControlNetModel.from_pretrained(controlnet_path)\n", "\n", " pipe = StableDiffusionXLInstantIDPipeline.from_pretrained(sdxl_id, controlnet=controlnet)\n", "\n", " # load adapter\n", " pipe.load_ip_adapter_instantid(face_adapter)\n", " # load lcm lora\n", " pipe.load_lora_weights(\"latent-consistency/lcm-lora-sdxl\")\n", " pipe.fuse_lora()\n", " scheduler = LCMScheduler.from_config(pipe.scheduler.config)\n", " pipe.set_ip_adapter_scale(0.8)\n", "\n", " controlnet, unet, vae = pipe.controlnet, pipe.unet, pipe.vae\n", " text_encoder, text_encoder_2, tokenizer, tokenizer_2 = (\n", " pipe.text_encoder,\n", " pipe.text_encoder_2,\n", " pipe.tokenizer,\n", " pipe.tokenizer_2,\n", " )\n", " image_proj_model = pipe.image_proj_model\n", " return (\n", " controlnet,\n", " unet,\n", " vae,\n", " text_encoder,\n", " text_encoder_2,\n", " tokenizer,\n", " tokenizer_2,\n", " image_proj_model,\n", " scheduler,\n", " )\n", "\n", "\n", "load_torch_models = any([not path.exists() for path in required_pipeline_parts])\n", "\n", "if load_torch_models:\n", " (\n", " controlnet,\n", " unet,\n", " vae,\n", " text_encoder,\n", " text_encoder_2,\n", " tokenizer,\n", " tokenizer_2,\n", " image_proj_model,\n", " scheduler,\n", " ) = load_pytorch_pipeline()\n", " tokenizer.save_pretrained(MODELS_DIR / \"tokenizer\")\n", " tokenizer_2.save_pretrained(MODELS_DIR / \"tokenizer_2\")\n", " scheduler.save_pretrained(MODELS_DIR / \"scheduler\")\n", "else:\n", " (\n", " controlnet,\n", " unet,\n", " vae,\n", " text_encoder,\n", " text_encoder_2,\n", " tokenizer,\n", " tokenizer_2,\n", " image_proj_model,\n", " scheduler,\n", " ) = (None, None, None, None, None, None, None, None, None)\n", "\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d3fc5252-df17-45cc-8733-51e759e76cc3", "metadata": {}, "source": [ "## Convert InstantID pipeline components to OpenVINO Intermediate Representation format\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Starting from 2023.0 release, OpenVINO supports PyTorch models conversion directly. We need to provide a model object, input data for model tracing to `ov.convert_model` function to obtain OpenVINO `ov.Model` object instance. Model can be saved on disk for next deployment using `ov.save_model` function.\n", "\n", "The pipeline consists of the following list of important parts:\n", "\n", "* Image Projection model for getting image prompt embeddings. It is similar with IP-Adapter approach described in [this tutorial](../stable-diffusion-ip-adapter/stable-diffusion-ip-adapter.ipynb), but instead of image, it uses face embeddings as input for image prompt encoding.\n", "* Text Encoders for creating text embeddings to generate an image from a text prompt.\n", "* ControlNet for conditioning by face keypoints image for translation face pose on generated image.\n", "* Unet for step-by-step denoising latent image representation.\n", "* Autoencoder (VAE) for decoding latent space to image.\n", "\n", "\n", "### ControlNet\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "ControlNet was introduced in [Adding Conditional Control to Text-to-Image Diffusion Models](https://arxiv.org/abs/2302.05543) paper. It provides a framework that enables support for various spatial contexts such as a depth map, a segmentation map, a scribble, and key points that can serve as additional conditionings to Diffusion models such as Stable Diffusion. In this [tutorial](../controlnet-stable-diffusion/controlnet-stable-diffusion.ipynb) we already considered how to convert and use ControlNet with Stable Diffusion pipeline. The process of usage ControlNet for Stable Diffusion XL remains without changes." ] }, { "cell_type": "code", "execution_count": 15, "id": "1c91dce2-7c75-405d-a32c-63eb400929e9", "metadata": {}, "outputs": [], "source": [ "import openvino as ov\n", "from functools import partial\n", "\n", "\n", "def cleanup_torchscript_cache():\n", " \"\"\"\n", " Helper for removing cached model representation\n", " \"\"\"\n", " torch._C._jit_clear_class_registry()\n", " torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()\n", " torch.jit._state._clear_class_state()\n", "\n", "\n", "controlnet_example_input = {\n", " \"sample\": torch.ones((2, 4, 100, 100)),\n", " \"timestep\": torch.tensor(1, dtype=torch.float32),\n", " \"encoder_hidden_states\": torch.randn((2, 77, 2048)),\n", " \"controlnet_cond\": torch.randn((2, 3, 800, 800)),\n", " \"conditioning_scale\": torch.tensor(0.8, dtype=torch.float32),\n", " \"added_cond_kwargs\": {\n", " \"text_embeds\": torch.zeros((2, 1280)),\n", " \"time_ids\": torch.ones((2, 6), dtype=torch.int32),\n", " },\n", "}\n", "\n", "\n", "if not ov_controlnet_path.exists():\n", " controlnet.forward = partial(controlnet.forward, return_dict=False)\n", " with torch.no_grad():\n", " ov_controlnet = ov.convert_model(controlnet, example_input=controlnet_example_input)\n", " ov_controlnet.inputs[-1].get_node().set_element_type(ov.Type.f32)\n", " ov_controlnet.inputs[-1].get_node().set_partial_shape(ov.PartialShape([-1, 6]))\n", " ov_controlnet.validate_nodes_and_infer_types()\n", " ov.save_model(ov_controlnet, ov_controlnet_path)\n", " cleanup_torchscript_cache()\n", " del ov_controlnet\n", " gc.collect()\n", "\n", "if not ov_unet_path.exists():\n", " down_block_res_samples, mid_block_res_sample = controlnet(**controlnet_example_input)\n", "else:\n", " down_block_res_samples, mid_block_res_sample = None, None\n", "\n", "del controlnet\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eca3fd12-4e06-4652-958f-577e95fed879", "metadata": {}, "source": [ "### Unet\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Compared with Stable Diffusion, Stable Diffusion XL Unet has an additional input for the `time_ids` condition. As we use ControlNet and Image Projection Model, these models' outputs also contribute to preparing model input for Unet." ] }, { "cell_type": "code", "execution_count": 16, "id": "06c921e5-05f1-4ec8-b712-c3dcd417e390", "metadata": {}, "outputs": [], "source": [ "from typing import Tuple\n", "\n", "\n", "class UnetWrapper(torch.nn.Module):\n", " def __init__(\n", " self,\n", " unet,\n", " sample_dtype=torch.float32,\n", " timestep_dtype=torch.int64,\n", " encoder_hidden_states_dtype=torch.float32,\n", " down_block_additional_residuals_dtype=torch.float32,\n", " mid_block_additional_residual_dtype=torch.float32,\n", " text_embeds_dtype=torch.float32,\n", " time_ids_dtype=torch.int32,\n", " ):\n", " super().__init__()\n", " self.unet = unet\n", " self.sample_dtype = sample_dtype\n", " self.timestep_dtype = timestep_dtype\n", " self.encoder_hidden_states_dtype = encoder_hidden_states_dtype\n", " self.down_block_additional_residuals_dtype = down_block_additional_residuals_dtype\n", " self.mid_block_additional_residual_dtype = mid_block_additional_residual_dtype\n", " self.text_embeds_dtype = text_embeds_dtype\n", " self.time_ids_dtype = time_ids_dtype\n", "\n", " def forward(\n", " self,\n", " sample: torch.Tensor,\n", " timestep: torch.Tensor,\n", " encoder_hidden_states: torch.Tensor,\n", " down_block_additional_residuals: Tuple[torch.Tensor],\n", " mid_block_additional_residual: torch.Tensor,\n", " text_embeds: torch.Tensor,\n", " time_ids: torch.Tensor,\n", " ):\n", " sample.to(self.sample_dtype)\n", " timestep.to(self.timestep_dtype)\n", " encoder_hidden_states.to(self.encoder_hidden_states_dtype)\n", " down_block_additional_residuals = [res.to(self.down_block_additional_residuals_dtype) for res in down_block_additional_residuals]\n", " mid_block_additional_residual.to(self.mid_block_additional_residual_dtype)\n", " added_cond_kwargs = {\n", " \"text_embeds\": text_embeds.to(self.text_embeds_dtype),\n", " \"time_ids\": time_ids.to(self.time_ids_dtype),\n", " }\n", "\n", " return self.unet(\n", " sample,\n", " timestep,\n", " encoder_hidden_states,\n", " down_block_additional_residuals=down_block_additional_residuals,\n", " mid_block_additional_residual=mid_block_additional_residual,\n", " added_cond_kwargs=added_cond_kwargs,\n", " )\n", "\n", "\n", "if not ov_unet_path.exists():\n", " unet_example_input = {\n", " \"sample\": torch.ones((2, 4, 100, 100)),\n", " \"timestep\": torch.tensor(1, dtype=torch.float32),\n", " \"encoder_hidden_states\": torch.randn((2, 77, 2048)),\n", " \"down_block_additional_residuals\": down_block_res_samples,\n", " \"mid_block_additional_residual\": mid_block_res_sample,\n", " \"text_embeds\": torch.zeros((2, 1280)),\n", " \"time_ids\": torch.ones((2, 6), dtype=torch.int32),\n", " }\n", " unet = UnetWrapper(unet)\n", " with torch.no_grad():\n", " ov_unet = ov.convert_model(unet, example_input=unet_example_input)\n", " for i in range(3, len(ov_unet.inputs) - 2):\n", " ov_unet.inputs[i].get_node().set_element_type(ov.Type.f32)\n", "\n", " ov_unet.validate_nodes_and_infer_types()\n", " ov.save_model(ov_unet, ov_unet_path)\n", " del ov_unet\n", " cleanup_torchscript_cache()\n", " gc.collect()\n", "\n", "del unet\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "93de1676-05da-44c3-ab09-12fb6b82ea79", "metadata": {}, "source": [ "### VAE Decoder\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "The VAE model has two parts, an encoder and a decoder. The encoder is used to convert the image into a low dimensional latent representation, which will serve as the input to the U-Net model. The decoder, conversely, transforms the latent representation back into an image.\n", "For InstantID pipeline we will use VAE only for decoding Unet generated image, it means that we can skip VAE encoder part conversion. " ] }, { "cell_type": "code", "execution_count": 17, "id": "0a2f91f9-4f96-4511-be0b-e565dc91ae4d", "metadata": {}, "outputs": [], "source": [ "class VAEDecoderWrapper(torch.nn.Module):\n", " def __init__(self, vae_decoder):\n", " super().__init__()\n", " self.vae = vae_decoder\n", "\n", " def forward(self, latents):\n", " return self.vae.decode(latents)\n", "\n", "\n", "if not ov_vae_decoder_path.exists():\n", " vae_decoder = VAEDecoderWrapper(vae)\n", "\n", " with torch.no_grad():\n", " ov_vae_decoder = ov.convert_model(vae_decoder, example_input=torch.zeros((1, 4, 64, 64)))\n", " ov.save_model(ov_vae_decoder, ov_vae_decoder_path)\n", " del ov_vae_decoder\n", " cleanup_torchscript_cache()\n", " del vae_decoder\n", " gc.collect()\n", "\n", "del vae\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "3ef98554-f6fa-44d5-b776-24199386b2ef", "metadata": {}, "source": [ "### Text Encoders\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "The text-encoder is responsible for transforming the input prompt, for example, \"a photo of an astronaut riding a horse\" into an embedding space that can be understood by the U-Net. It is usually a simple transformer-based encoder that maps a sequence of input tokens to a sequence of latent text embeddings." ] }, { "cell_type": "code", "execution_count": 18, "id": "7ce333c8-5d72-40f3-9a13-ef183e2fa44b", "metadata": {}, "outputs": [], "source": [ "inputs = {\"input_ids\": torch.ones((1, 77), dtype=torch.long)}\n", "\n", "if not ov_text_encoder_path.exists():\n", " text_encoder.eval()\n", " text_encoder.config.output_hidden_states = True\n", " text_encoder.config.return_dict = False\n", " with torch.no_grad():\n", " ov_text_encoder = ov.convert_model(text_encoder, example_input=inputs)\n", " ov.save_model(ov_text_encoder, ov_text_encoder_path)\n", " del ov_text_encoder\n", " cleanup_torchscript_cache()\n", " gc.collect()\n", "\n", "del text_encoder\n", "gc.collect()\n", "\n", "if not ov_text_encoder_2_path.exists():\n", " text_encoder_2.eval()\n", " text_encoder_2.config.output_hidden_states = True\n", " text_encoder_2.config.return_dict = False\n", " with torch.no_grad():\n", " ov_text_encoder = ov.convert_model(text_encoder_2, example_input=inputs)\n", " ov.save_model(ov_text_encoder, ov_text_encoder_2_path)\n", " del ov_text_encoder\n", " cleanup_torchscript_cache()\n", "del text_encoder_2\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "32eeb488-9af2-4f2a-9371-7750b666864a", "metadata": {}, "source": [ "### Image Projection Model\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Image projection model is responsible to transforming face embeddings to image prompt embeddings" ] }, { "cell_type": "code", "execution_count": 19, "id": "3b9c12be-3322-4c81-85e7-434e204fe28d", "metadata": {}, "outputs": [], "source": [ "if not ov_image_proj_encoder_path.exists():\n", " with torch.no_grad():\n", " ov_image_encoder = ov.convert_model(image_proj_model, example_input=torch.zeros((2, 1, 512)))\n", " ov.save_model(ov_image_encoder, ov_image_proj_encoder_path)\n", " del ov_image_encoder\n", " cleanup_torchscript_cache()\n", "del image_proj_model\n", "gc.collect();" ] }, { "attachments": {}, "cell_type": "markdown", "id": "19b7133c-c4b5-4dfe-a05d-79139d8ae2a0", "metadata": {}, "source": [ "## Prepare OpenVINO InstantID Pipeline\n", "[back to top ⬆️](#Table-of-contents:)" ] }, { "cell_type": "code", "execution_count": 20, "id": "0da019c6-2010-4cfe-884b-ca247b798208", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from diffusers import StableDiffusionXLControlNetPipeline\n", "from diffusers.pipelines.stable_diffusion_xl import StableDiffusionXLPipelineOutput\n", "from typing import Any, Callable, Dict, List, Optional, Tuple, Union\n", "\n", "import numpy as np\n", "import torch\n", "\n", "from diffusers.image_processor import PipelineImageInput, VaeImageProcessor\n", "\n", "\n", "class OVStableDiffusionXLInstantIDPipeline(StableDiffusionXLControlNetPipeline):\n", " def __init__(\n", " self,\n", " text_encoder,\n", " text_encoder_2,\n", " image_proj_model,\n", " controlnet,\n", " unet,\n", " vae_decoder,\n", " tokenizer,\n", " tokenizer_2,\n", " scheduler,\n", " ):\n", " self.text_encoder = text_encoder\n", " self.text_encoder_2 = text_encoder_2\n", " self.tokenizer = tokenizer\n", " self.tokenizer_2 = tokenizer_2\n", " self.image_proj_model = image_proj_model\n", " self.controlnet = controlnet\n", " self.unet = unet\n", " self.vae_decoder = vae_decoder\n", " self.scheduler = scheduler\n", " self.image_proj_model_in_features = 512\n", " self.vae_scale_factor = 8\n", " self.vae_scaling_factor = 0.13025\n", " self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)\n", " self.control_image_processor = VaeImageProcessor(\n", " vae_scale_factor=self.vae_scale_factor,\n", " do_convert_rgb=True,\n", " do_normalize=False,\n", " )\n", " self._internal_dict = {}\n", " self._progress_bar_config = {}\n", "\n", " def _encode_prompt_image_emb(self, prompt_image_emb, num_images_per_prompt, do_classifier_free_guidance):\n", " if isinstance(prompt_image_emb, torch.Tensor):\n", " prompt_image_emb = prompt_image_emb.clone().detach()\n", " else:\n", " prompt_image_emb = torch.tensor(prompt_image_emb)\n", "\n", " prompt_image_emb = prompt_image_emb.reshape([1, -1, self.image_proj_model_in_features])\n", "\n", " if do_classifier_free_guidance:\n", " prompt_image_emb = torch.cat([torch.zeros_like(prompt_image_emb), prompt_image_emb], dim=0)\n", " else:\n", " prompt_image_emb = torch.cat([prompt_image_emb], dim=0)\n", " prompt_image_emb = self.image_proj_model(prompt_image_emb)[0]\n", "\n", " bs_embed, seq_len, _ = prompt_image_emb.shape\n", " prompt_image_emb = np.tile(prompt_image_emb, (1, num_images_per_prompt, 1))\n", " prompt_image_emb = prompt_image_emb.reshape(bs_embed * num_images_per_prompt, seq_len, -1)\n", "\n", " return prompt_image_emb\n", "\n", " def __call__(\n", " self,\n", " prompt: Union[str, List[str]] = None,\n", " prompt_2: Optional[Union[str, List[str]]] = None,\n", " image: PipelineImageInput = None,\n", " height: Optional[int] = None,\n", " width: Optional[int] = None,\n", " num_inference_steps: int = 50,\n", " guidance_scale: float = 5.0,\n", " negative_prompt: Optional[Union[str, List[str]]] = None,\n", " negative_prompt_2: Optional[Union[str, List[str]]] = None,\n", " num_images_per_prompt: Optional[int] = 1,\n", " eta: float = 0.0,\n", " generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,\n", " latents: Optional[torch.FloatTensor] = None,\n", " prompt_embeds: Optional[torch.FloatTensor] = None,\n", " negative_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " pooled_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " image_embeds: Optional[torch.FloatTensor] = None,\n", " output_type: Optional[str] = \"pil\",\n", " return_dict: bool = True,\n", " cross_attention_kwargs: Optional[Dict[str, Any]] = None,\n", " controlnet_conditioning_scale: Union[float, List[float]] = 1.0,\n", " guess_mode: bool = False,\n", " control_guidance_start: Union[float, List[float]] = 0.0,\n", " control_guidance_end: Union[float, List[float]] = 1.0,\n", " original_size: Tuple[int, int] = None,\n", " crops_coords_top_left: Tuple[int, int] = (0, 0),\n", " target_size: Tuple[int, int] = None,\n", " negative_original_size: Optional[Tuple[int, int]] = None,\n", " negative_crops_coords_top_left: Tuple[int, int] = (0, 0),\n", " negative_target_size: Optional[Tuple[int, int]] = None,\n", " clip_skip: Optional[int] = None,\n", " callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,\n", " callback_on_step_end_tensor_inputs: List[str] = [\"latents\"],\n", " # IP adapter\n", " ip_adapter_scale=None,\n", " **kwargs,\n", " ):\n", " r\"\"\"\n", " The call function to the pipeline for generation.\n", "\n", " Args:\n", " prompt (`str` or `List[str]`, *optional*):\n", " The prompt or prompts to guide image generation. If not defined, you need to pass `prompt_embeds`.\n", " prompt_2 (`str` or `List[str]`, *optional*):\n", " The prompt or prompts to be sent to `tokenizer_2` and `text_encoder_2`. If not defined, `prompt` is\n", " used in both text-encoders.\n", " image (`torch.FloatTensor`, `PIL.Image.Image`, `np.ndarray`, `List[torch.FloatTensor]`, `List[PIL.Image.Image]`, `List[np.ndarray]`,:\n", " `List[List[torch.FloatTensor]]`, `List[List[np.ndarray]]` or `List[List[PIL.Image.Image]]`):\n", " The ControlNet input condition to provide guidance to the `unet` for generation. If the type is\n", " specified as `torch.FloatTensor`, it is passed to ControlNet as is. `PIL.Image.Image` can also be\n", " accepted as an image. The dimensions of the output image defaults to `image`'s dimensions. If height__module.unet.up_blocks.0.upsamplers.0.conv.base_layer/aten::_convolu\n", " and/or width are passed, `image` is resized accordingly. If multiple ControlNets are specified in\n", " `init`, images must be passed as a list such that each element of the list can be correctly batched for\n", " input to a single ControlNet.\n", " height (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):\n", " The height in pixels of the generated image. Anything below 512 pixels won't work well for\n", " [stabilityai/stable-diffusion-xl-base-1.0](https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0)\n", " and checkpoints that are not specifically fine-tuned on low resolutions.\n", " width (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):\n", " The width in pixels of the generated image. Anything below 512 pixels won't work well for\n", " [stabilityai/stable-diffusion-xl-base-1.0](https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0)\n", " and checkpoints that are not specifically fine-tuned on low resolutions.\n", " num_inference_steps (`int`, *optional*, defaults to 50):\n", " The number of denoising steps. More denoising steps usually lead to a higher quality image at the\n", " expense of slower inference.\n", " guidance_scale (`float`, *optional*, defaults to 5.0):\n", " A higher guidance scale value encourages the model to generate images closely linked to the text\n", " `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`.\n", " negative_prompt (`str` or `List[str]`, *optional*):\n", " The prompt or prompts to guide what to not include in image generation. If not defined, you need to\n", " pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`).\n", " negative_prompt_2 (`str` or `List[str]`, *optional*):\n", " The prompt or prompts to guide what to not include in image generation. This is sent to `tokenizer_2`\n", " and `text_encoder_2`. If not defined, `negative_prompt` is used in both text-encoders.\n", " num_images_per_prompt (`int`, *optional*, defaults to 1):\n", " The number of images to generate per prompt.\n", " eta (`float`, *optional*, defaults to 0.0):\n", " Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies\n", " to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers.\n", " generator (`torch.Generator` or `List[torch.Generator]`, *optional*):\n", " A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make\n", " generation deterministic.\n", " latents (`torch.FloatTensor`, *optional*):\n", " Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image\n", " generation. Can be used to tweak the same generation with different prompts. If not provided, a latents\n", " tensor is generated by sampling using the supplied random `generator`.\n", " prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not\n", " provided, text embeddings are generated from the `prompt` input argument.\n", " negative_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If\n", " not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument.\n", " pooled_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated pooled text embeddings. Can be used to easily tweak text inputs (prompt weighting). If\n", " not provided, pooled text embeddings are generated from `prompt` input argument.\n", " negative_pooled_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated negative pooled text embeddings. Can be used to easily tweak text inputs (prompt\n", " weighting). If not provided, pooled `negative_prompt_embeds` are generated from `negative_prompt` input\n", " argument.\n", " image_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated image embeddings.\n", " output_type (`str`, *optional*, defaults to `\"pil\"`):\n", " The output format of the generated image. Choose between `PIL.Image` or `np.array`.\n", " return_dict (`bool`, *optional*, defaults to `True`):\n", " Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a\n", " plain tuple.\n", " controlnet_conditioning_scale (`float` or `List[float]`, *optional*, defaults to 1.0):\n", " The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added\n", " to the residual in the original `unet`. If multiple ControlNets are specified in `init`, you can set\n", " the corresponding scale as a list.\n", " control_guidance_start (`float` or `List[float]`, *optional*, defaults to 0.0):\n", " The percentage of total steps at which the ControlNet starts applying.\n", " control_guidance_end (`float` or `List[float]`, *optional*, defaults to 1.0):\n", " The percentage of total steps at which the ControlNet stops applying.\n", " original_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):\n", " If `original_size` is not the same as `target_size` the image will appear to be down- or upsampled.\n", " `original_size` defaults to `(height, width)` if not specified. Part of SDXL's micro-conditioning as\n", " explained in section 2.2 of\n", " [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).\n", " crops_coords_top_left (`Tuple[int]`, *optional*, defaults to (0, 0)):\n", " `crops_coords_top_left` can be used to generate an image that appears to be \"cropped\" from the position\n", " `crops_coords_top_left` downwards. Favorable, well-centered images are usually achieved by setting\n", " `crops_coords_top_left` to (0, 0). Part of SDXL's micro-conditioning as explained in section 2.2 of\n", " [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).\n", " target_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):\n", " For most cases, `target_size` should be set to the desired height and width of the generated image. If\n", " not specified it will default to `(height, width)`. Part of SDXL's micro-conditioning as explained in\n", " section 2.2 of [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952).\n", " negative_original_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):\n", " To negatively condition the generation process based on a specific image resolution. Part of SDXL's\n", " micro-conditioning as explained in section 2.2 of\n", " [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952). For more\n", " information, refer toencode_pro this issue thread: https://github.com/huggingface/diffusers/issues/4208.\n", " negative_crops_coords_top_left (`Tuple[int]`, *optional*, defaults to (0, 0)):\n", " To negatively condition the generation process based on a specific crop coordinates. Part of SDXL's\n", " micro-conditioning as explained in section 2.2 of\n", " [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952). For more\n", " information, refer to this issue thread: https://github.com/huggingface/diffusers/issues/4208.\n", " negative_target_size (`Tuple[int]`, *optional*, defaults to (1024, 1024)):\n", " To negatively condition the generation process based on a target image resolution. It should be as same\n", " as the `target_size` for most cases. Part of SDXL's micro-conditioning as explained in section 2.2 of\n", " [https://huggingface.co/papers/2307.01952](https://huggingface.co/papers/2307.01952). For more\n", " information, refer to this issue thread: https://github.com/huggingface/diffusers/issues/4208.\n", " clip_skip (`int`, *optional*):\n", " Number of layers to be skipped from CLIP while computing the prompt embeddings. A value of 1 means that\n", " the output of the pre-final layer will be used for computing the prompt embeddings.\n", "\n", " Examples:\n", "\n", " Returns:\n", " [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:\n", " If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] is returned,\n", " otherwise a `tuple` is returned containing the output images.\n", " \"\"\"\n", "\n", " do_classifier_free_guidance = guidance_scale >= 1.0\n", " # align format for control guidance\n", " if not isinstance(control_guidance_start, list) and isinstance(control_guidance_end, list):\n", " control_guidance_start = len(control_guidance_end) * [control_guidance_start]\n", " elif not isinstance(control_guidance_end, list) and isinstance(control_guidance_start, list):\n", " control_guidance_end = len(control_guidance_start) * [control_guidance_end]\n", " elif not isinstance(control_guidance_start, list) and not isinstance(control_guidance_end, list):\n", " control_guidance_start, control_guidance_end = (\n", " [control_guidance_start],\n", " [control_guidance_end],\n", " )\n", "\n", " # 2. Define call parameters\n", " if prompt is not None and isinstance(prompt, str):\n", " batch_size = 1\n", " elif prompt is not None and isinstance(prompt, list):\n", " batch_size = len(prompt)\n", " else:\n", " batch_size = prompt_embeds.shape[0]\n", "\n", " (\n", " prompt_embeds,\n", " negative_prompt_embeds,\n", " pooled_prompt_embeds,\n", " negative_pooled_prompt_embeds,\n", " ) = self.encode_prompt(\n", " prompt,\n", " prompt_2,\n", " num_images_per_prompt,\n", " do_classifier_free_guidance,\n", " negative_prompt,\n", " negative_prompt_2,\n", " prompt_embeds=prompt_embeds,\n", " negative_prompt_embeds=negative_prompt_embeds,\n", " pooled_prompt_embeds=pooled_prompt_embeds,\n", " negative_pooled_prompt_embeds=negative_pooled_prompt_embeds,\n", " lora_scale=None,\n", " clip_skip=clip_skip,\n", " )\n", "\n", " # 3.2 Encode image prompt\n", " prompt_image_emb = self._encode_prompt_image_emb(image_embeds, num_images_per_prompt, do_classifier_free_guidance)\n", "\n", " # 4. Prepare image\n", " image = self.prepare_image(\n", " image=image,\n", " width=width,\n", " height=height,\n", " batch_size=batch_size * num_images_per_prompt,\n", " num_images_per_prompt=num_images_per_prompt,\n", " do_classifier_free_guidance=do_classifier_free_guidance,\n", " guess_mode=guess_mode,\n", " )\n", " height, width = image.shape[-2:]\n", "\n", " # 5. Prepare timesteps\n", " self.scheduler.set_timesteps(num_inference_steps)\n", " timesteps = self.scheduler.timesteps\n", "\n", " # 6. Prepare latent variables\n", " num_channels_latents = 4\n", " latents = self.prepare_latents(\n", " int(batch_size) * int(num_images_per_prompt),\n", " int(num_channels_latents),\n", " int(height),\n", " int(width),\n", " dtype=torch.float32,\n", " device=torch.device(\"cpu\"),\n", " generator=generator,\n", " latents=latents,\n", " )\n", "\n", " # 7. Prepare extra step kwargs.\n", " extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)\n", " # 7.1 Create tensor stating which controlnets to keep\n", " controlnet_keep = []\n", " for i in range(len(timesteps)):\n", " keeps = [1.0 - float(i / len(timesteps) < s or (i + 1) / len(timesteps) > e) for s, e in zip(control_guidance_start, control_guidance_end)]\n", " controlnet_keep.append(keeps)\n", "\n", " # 7.2 Prepare added time ids & embeddings\n", " if isinstance(image, list):\n", " original_size = original_size or image[0].shape[-2:]\n", " else:\n", " original_size = original_size or image.shape[-2:]\n", " target_size = target_size or (height, width)\n", "\n", " add_text_embeds = pooled_prompt_embeds\n", " if self.text_encoder_2 is None:\n", " text_encoder_projection_dim = pooled_prompt_embeds.shape[-1]\n", " else:\n", " text_encoder_projection_dim = 1280\n", "\n", " add_time_ids = self._get_add_time_ids(\n", " original_size,\n", " crops_coords_top_left,\n", " target_size,\n", " text_encoder_projection_dim=text_encoder_projection_dim,\n", " )\n", "\n", " if negative_original_size is not None and negative_target_size is not None:\n", " negative_add_time_ids = self._get_add_time_ids(\n", " negative_original_size,\n", " negative_crops_coords_top_left,\n", " negative_target_size,\n", " text_encoder_projection_dim=text_encoder_projection_dim,\n", " )\n", " else:\n", " negative_add_time_ids = add_time_ids\n", "\n", " if do_classifier_free_guidance:\n", " prompt_embeds = np.concatenate([negative_prompt_embeds, prompt_embeds], axis=0)\n", " add_text_embeds = np.concatenate([negative_pooled_prompt_embeds, add_text_embeds], axis=0)\n", " add_time_ids = np.concatenate([negative_add_time_ids, add_time_ids], axis=0)\n", "\n", " add_time_ids = np.tile(add_time_ids, (batch_size * num_images_per_prompt, 1))\n", " encoder_hidden_states = np.concatenate([prompt_embeds, prompt_image_emb], axis=1)\n", "\n", " # 8. Denoising loop\n", " with self.progress_bar(total=num_inference_steps) as progress_bar:\n", " for i, t in enumerate(timesteps):\n", " # expand the latents if we are doing classifier free guidance\n", " latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents\n", " latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)\n", "\n", " # controlnet(s) inference\n", " control_model_input = latent_model_input\n", "\n", " cond_scale = controlnet_conditioning_scale\n", "\n", " controlnet_outputs = self.controlnet(\n", " [\n", " control_model_input,\n", " t,\n", " prompt_image_emb,\n", " image,\n", " cond_scale,\n", " add_text_embeds,\n", " add_time_ids,\n", " ]\n", " )\n", "\n", " controlnet_additional_blocks = list(controlnet_outputs.values())\n", "\n", " # predict the noise residual\n", " noise_pred = self.unet(\n", " [\n", " latent_model_input,\n", " t,\n", " encoder_hidden_states,\n", " *controlnet_additional_blocks,\n", " add_text_embeds,\n", " add_time_ids,\n", " ]\n", " )[0]\n", "\n", " # perform guidance\n", " if do_classifier_free_guidance:\n", " noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]\n", " noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)\n", "\n", " # compute the previous noisy sample x_t -> x_t-1\n", " latents = self.scheduler.step(\n", " torch.from_numpy(noise_pred),\n", " t,\n", " latents,\n", " **extra_step_kwargs,\n", " return_dict=False,\n", " )[0]\n", " progress_bar.update()\n", "\n", " if not output_type == \"latent\":\n", " image = self.vae_decoder(latents / self.vae_scaling_factor)[0]\n", " else:\n", " image = latents\n", "\n", " if not output_type == \"latent\":\n", " image = self.image_processor.postprocess(torch.from_numpy(image), output_type=output_type)\n", "\n", " if not return_dict:\n", " return (image,)\n", "\n", " return StableDiffusionXLPipelineOutput(images=image)\n", "\n", " def encode_prompt(\n", " self,\n", " prompt: str,\n", " prompt_2: Optional[str] = None,\n", " num_images_per_prompt: int = 1,\n", " do_classifier_free_guidance: bool = True,\n", " negative_prompt: Optional[str] = None,\n", " negative_prompt_2: Optional[str] = None,\n", " prompt_embeds: Optional[torch.FloatTensor] = None,\n", " negative_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " pooled_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,\n", " lora_scale: Optional[float] = None,\n", " clip_skip: Optional[int] = None,\n", " ):\n", " r\"\"\"\n", " Encodes the prompt into text encoder hidden states.\n", "\n", " Args:\n", " prompt (`str` or `List[str]`, *optional*):\n", " prompt to be encoded\n", " prompt_2 (`str` or `List[str]`, *optional*):\n", " The prompt or prompts to be sent to the `tokenizer_2` and `text_encoder_2`. If not defined, `prompt` is\n", " used in both text-encoders\n", " num_images_per_prompt (`int`):\n", " number of images that should be generated per prompt\n", " do_classifier_free_guidance (`bool`):\n", " whether to use classifier free guidance or not\n", " negative_prompt (`str` or `List[str]`, *optional*):\n", " The prompt or prompts not to guide the image generation. If not defined, one has to pass\n", " `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is\n", " less than `1`).\n", " negative_prompt_2 (`str` or `List[str]`, *optional*):\n", " The prompt or prompts not to guide the image generation to be sent to `tokenizer_2` and\n", " `text_encoder_2`. If not defined, `negative_prompt` is used in both text-encoders\n", " prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not\n", " provided, text embeddings will be generated from `prompt` input argument.\n", " negative_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt\n", " weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input\n", " argument.\n", " pooled_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting.\n", " If not provided, pooled text embeddings will be generated from `prompt` input argument.\n", " negative_pooled_prompt_embeds (`torch.FloatTensor`, *optional*):\n", " Pre-generated negative pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt\n", " weighting. If not provided, pooled negative_prompt_embeds will be generated from `negative_prompt`\n", " input argument.\n", " lora_scale (`float`, *optional*):\n", " A lora scale that will be applied to all LoRA layers of the text encoder if LoRA layers are loaded.\n", " clip_skip (`int`, *optional*):\n", " Number of layers to be skipped from CLIP while computing the prompt embeddings. A value of 1 means that\n", " the output of the pre-final layer will be used for computing the prompt embeddings.\n", " \"\"\"\n", " prompt = [prompt] if isinstance(prompt, str) else prompt\n", "\n", " if prompt is not None:\n", " batch_size = len(prompt)\n", " else:\n", " batch_size = prompt_embeds.shape[0]\n", "\n", " # Define tokenizers and text encoders\n", " tokenizers = [self.tokenizer, self.tokenizer_2] if self.tokenizer is not None else [self.tokenizer_2]\n", " text_encoders = [self.text_encoder, self.text_encoder_2] if self.text_encoder is not None else [self.text_encoder_2]\n", "\n", " if prompt_embeds is None:\n", " prompt_2 = prompt_2 or prompt\n", " prompt_2 = [prompt_2] if isinstance(prompt_2, str) else prompt_2\n", "\n", " # textual inversion: procecss multi-vector tokens if necessary\n", " prompt_embeds_list = []\n", " prompts = [prompt, prompt_2]\n", " for prompt, tokenizer, text_encoder in zip(prompts, tokenizers, text_encoders):\n", " text_inputs = tokenizer(\n", " prompt,\n", " padding=\"max_length\",\n", " max_length=tokenizer.model_max_length,\n", " truncation=True,\n", " return_tensors=\"pt\",\n", " )\n", "\n", " text_input_ids = text_inputs.input_ids\n", "\n", " prompt_embeds = text_encoder(text_input_ids)\n", "\n", " # We are only ALWAYS interested in the pooled output of the final text encoder\n", " pooled_prompt_embeds = prompt_embeds[0]\n", " hidden_states = list(prompt_embeds.values())[1:]\n", " if clip_skip is None:\n", " prompt_embeds = hidden_states[-2]\n", " else:\n", " # \"2\" because SDXL always indexes from the penultimate layer.\n", " prompt_embeds = hidden_states[-(clip_skip + 2)]\n", "\n", " prompt_embeds_list.append(prompt_embeds)\n", "\n", " prompt_embeds = np.concatenate(prompt_embeds_list, axis=-1)\n", "\n", " # get unconditional embeddings for classifier free guidance\n", " zero_out_negative_prompt = negative_prompt is None\n", " if do_classifier_free_guidance and negative_prompt_embeds is None and zero_out_negative_prompt:\n", " negative_prompt_embeds = np.zeros_like(prompt_embeds)\n", " negative_pooled_prompt_embeds = np.zeros_like(pooled_prompt_embeds)\n", " elif do_classifier_free_guidance and negative_prompt_embeds is None:\n", " negative_prompt = negative_prompt or \"\"\n", " negative_prompt_2 = negative_prompt_2 or negative_prompt\n", "\n", " # normalize str to list\n", " negative_prompt = batch_size * [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt\n", " negative_prompt_2 = batch_size * [negative_prompt_2] if isinstance(negative_prompt_2, str) else negative_prompt_2\n", "\n", " uncond_tokens: List[str]\n", " if prompt is not None and type(prompt) is not type(negative_prompt):\n", " raise TypeError(f\"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !=\" f\" {type(prompt)}.\")\n", " elif batch_size != len(negative_prompt):\n", " raise ValueError(\n", " f\"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:\"\n", " f\" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches\"\n", " \" the batch size of `prompt`.\"\n", " )\n", " else:\n", " uncond_tokens = [negative_prompt, negative_prompt_2]\n", "\n", " negative_prompt_embeds_list = []\n", " for negative_prompt, tokenizer, text_encoder in zip(uncond_tokens, tokenizers, text_encoders):\n", " max_length = prompt_embeds.shape[1]\n", " uncond_input = tokenizer(\n", " negative_prompt,\n", " padding=\"max_length\",\n", " max_length=max_length,\n", " truncation=True,\n", " return_tensors=\"pt\",\n", " )\n", "\n", " negative_prompt_embeds = text_encoder(uncond_input.input_ids)\n", " # We are only ALWAYS interested in the pooled output of the final text encoder\n", " negative_pooled_prompt_embeds = negative_prompt_embeds[0]\n", " hidden_states = list(negative_prompt_embeds.values())[1:]\n", " negative_prompt_embeds = hidden_states[-2]\n", "\n", " negative_prompt_embeds_list.append(negative_prompt_embeds)\n", "\n", " negative_prompt_embeds = np.concatenate(negative_prompt_embeds_list, axis=-1)\n", "\n", " bs_embed, seq_len, _ = prompt_embeds.shape\n", " # duplicate text embeddings for each generation per prompt, using mps friendly method\n", " prompt_embeds = np.tile(prompt_embeds, (1, num_images_per_prompt, 1))\n", " prompt_embeds = prompt_embeds.reshape(bs_embed * num_images_per_prompt, seq_len, -1)\n", "\n", " if do_classifier_free_guidance:\n", " # duplicate unconditional embeddings for each generation per prompt, using mps friendly method\n", " seq_len = negative_prompt_embeds.shape[1]\n", " negative_prompt_embeds = np.tile(negative_prompt_embeds, (1, num_images_per_prompt, 1))\n", " negative_prompt_embeds = negative_prompt_embeds.reshape(batch_size * num_images_per_prompt, seq_len, -1)\n", "\n", " pooled_prompt_embeds = np.tile(pooled_prompt_embeds, (1, num_images_per_prompt)).reshape(bs_embed * num_images_per_prompt, -1)\n", " if do_classifier_free_guidance:\n", " negative_pooled_prompt_embeds = np.tile(negative_pooled_prompt_embeds, (1, num_images_per_prompt)).reshape(bs_embed * num_images_per_prompt, -1)\n", "\n", " return (\n", " prompt_embeds,\n", " negative_prompt_embeds,\n", " pooled_prompt_embeds,\n", " negative_pooled_prompt_embeds,\n", " )\n", "\n", " def prepare_image(\n", " self,\n", " image,\n", " width,\n", " height,\n", " batch_size,\n", " num_images_per_prompt,\n", " do_classifier_free_guidance=False,\n", " guess_mode=False,\n", " ):\n", " image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)\n", " image_batch_size = image.shape[0]\n", "\n", " if image_batch_size == 1:\n", " repeat_by = batch_size\n", " else:\n", " # image batch size is the same as prompt batch size\n", " repeat_by = num_images_per_prompt\n", "\n", " image = image.repeat_interleave(repeat_by, dim=0)\n", "\n", " if do_classifier_free_guidance and not guess_mode:\n", " image = torch.cat([image] * 2)\n", "\n", " return image\n", "\n", " def _get_add_time_ids(\n", " self,\n", " original_size,\n", " crops_coords_top_left,\n", " target_size,\n", " text_encoder_projection_dim,\n", " ):\n", " add_time_ids = list(original_size + crops_coords_top_left + target_size)\n", " add_time_ids = torch.tensor([add_time_ids])\n", " return add_time_ids" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f8764e2d-b1ae-4d5a-9541-a170a9003de1", "metadata": {}, "source": [ "## Run OpenVINO pipeline inference\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "### Select inference device for InstantID\n", "[back to top ⬆️](#Table-of-contents:)" ] }, { "cell_type": "code", "execution_count": 21, "id": "e33bf6f4-71b5-47a2-9e32-0479df8be454", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "c636605a586a43749aeabe60c391baa7", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "device" ] }, { "cell_type": "code", "execution_count": 22, "id": "e111bc7e-fac9-4881-a3d2-f39057f369b6", "metadata": {}, "outputs": [], "source": [ "text_encoder = core.compile_model(ov_text_encoder_path, device.value)\n", "text_encoder_2 = core.compile_model(ov_text_encoder_2_path, device.value)\n", "vae_decoder = core.compile_model(ov_vae_decoder_path, device.value)\n", "unet = core.compile_model(ov_unet_path, device.value)\n", "controlnet = core.compile_model(ov_controlnet_path, device.value)\n", "image_proj_model = core.compile_model(ov_image_proj_encoder_path, device.value)" ] }, { "cell_type": "code", "execution_count": 23, "id": "652b2d05-bd26-46c1-ba09-96f27db970e8", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "The config attributes {'interpolation_type': 'linear', 'skip_prk_steps': True, 'use_karras_sigmas': False} were passed to LCMScheduler, but are not expected and will be ignored. Please verify your scheduler_config.json configuration file.\n" ] } ], "source": [ "from transformers import AutoTokenizer\n", "\n", "tokenizer = AutoTokenizer.from_pretrained(MODELS_DIR / \"tokenizer\")\n", "tokenizer_2 = AutoTokenizer.from_pretrained(MODELS_DIR / \"tokenizer_2\")\n", "scheduler = LCMScheduler.from_pretrained(MODELS_DIR / \"scheduler\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "3772ef85-8419-4f4a-bef7-7fd0061f3e09", "metadata": {}, "source": [ "### Create pipeline\n", "[back to top ⬆️](#Table-of-contents:)\n", "### Create pipeline" ] }, { "cell_type": "code", "execution_count": 24, "id": "15b05064-d917-41c5-b7f9-82fd1ff67fce", "metadata": {}, "outputs": [], "source": [ "ov_pipe = OVStableDiffusionXLInstantIDPipeline(\n", " text_encoder,\n", " text_encoder_2,\n", " image_proj_model,\n", " controlnet,\n", " unet,\n", " vae_decoder,\n", " tokenizer,\n", " tokenizer_2,\n", " scheduler,\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d35af81c-3bc9-4276-ad15-d6359ff1fcb0", "metadata": {}, "source": [ "### Run inference\n", "[back to top ⬆️](#Table-of-contents:)\n", "### Run inference" ] }, { "cell_type": "code", "execution_count": 25, "id": "95dd4b51-fe34-4aa8-99a7-fd995cf4b6bc", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "2b36efcb22eb4b0790f8b02b1f727969", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/4 [00:00" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "image" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0e23a7fc", "metadata": {}, "source": [ "## Quantization\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "[NNCF](https://github.com/openvinotoolkit/nncf/) enables post-training quantization by adding quantization layers into model graph and then using a subset of the training dataset to initialize the parameters of these additional quantization layers. Quantized operations are executed in `INT8` instead of `FP32`/`FP16` making model inference faster.\n", "\n", "According to `OVStableDiffusionXLInstantIDPipeline` structure, ControlNet and UNet models are used in the cycle repeating inference on each diffusion step, while other parts of pipeline take part only once. Now we will show you how to optimize pipeline using [NNCF](https://github.com/openvinotoolkit/nncf/) to reduce memory and computation cost.\n", "\n", "Please select below whether you would like to run quantization to improve model inference speed.\n", "\n", "> **NOTE**: Quantization is time and memory consuming operation. Running quantization code below may take some time." ] }, { "cell_type": "code", "execution_count": 27, "id": "6a68873d", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "3685d51e85d244169d53564472e30d9f", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Checkbox(value=True, description='Quantization')" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "skip_for_device = \"GPU\" in device.value\n", "to_quantize = widgets.Checkbox(value=not skip_for_device, description=\"Quantization\", disabled=skip_for_device)\n", "to_quantize" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f47efa7d", "metadata": {}, "source": [ "Let's load `skip magic` extension to skip quantization if `to_quantize` is not selected" ] }, { "cell_type": "code", "execution_count": 28, "id": "d077d0a4", "metadata": {}, "outputs": [], "source": [ "# Fetch `skip_kernel_extension` module\n", "import requests\n", "\n", "r = requests.get(\n", " url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py\",\n", ")\n", "open(\"skip_kernel_extension.py\", \"w\").write(r.text)\n", "\n", "int8_pipe = None\n", "\n", "%load_ext skip_kernel_extension" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c333ceb2", "metadata": {}, "source": [ "### Prepare calibration datasets\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "We use a portion of [`wider_face`](https://huggingface.co/datasets/wider_face) dataset from Hugging Face as calibration data. We use prompts below to guide image generation and to determine what not to include in the resulting image." ] }, { "cell_type": "code", "execution_count": 29, "id": "bfd90b7c", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "negative_prompts = [\n", " \"blurry unreal occluded\",\n", " \"low contrast disfigured uncentered mangled\",\n", " \"amateur out of frame low quality nsfw\",\n", " \"ugly underexposed jpeg artifacts\",\n", " \"low saturation disturbing content\",\n", " \"overexposed severe distortion\",\n", " \"amateur NSFW\",\n", " \"ugly mutilated out of frame disfigured\",\n", "]\n", "prompts = [\n", " \"a Naruto-style image of a young boy, incorporating dynamic action lines, intense energy effects, and a sense of movement and power\",\n", " \"an anime-style girl, with vibrant, otherworldly colors, fantastical elements, and a sense of awe\",\n", " \"analog film photo of a man. faded film, desaturated, 35mm photo, grainy, vignette, vintage, Kodachrome, Lomography, stained, highly detailed, found footage, masterpiece, best quality\",\n", " \"Apply a staining filter to give the impression of aged, worn-out film while maintaining sharp detail on a portrait of a woman\",\n", " \"a modern picture of a boy an antique feel through selective desaturation, grain addition, and a warm tone, mimicking the style of old photographs\",\n", " \"a dreamy, ethereal portrait of a young girl, featuring soft, pastel colors, a blurred background, and a touch of bokeh\",\n", " \"a dynamic, action-packed image of a boy in motion, using motion blur, panning, and other techniques to convey a sense of speed and energy\",\n", " \"a dramatic, cinematic image of a boy, using color grading, contrast adjustments, and a widescreen aspect ratio, to create a sense of epic scale and grandeur\",\n", " \"a portrait of a woman in the style of Picasso's cubism, featuring fragmented shapes, bold lines, and a vibrant color palette\",\n", " \"an artwork in the style of Picasso's Blue Period, featuring a somber, melancholic portrait of a person, with muted colors, elongated forms, and a sense of introspection and contemplation\",\n", "]" ] }, { "cell_type": "code", "execution_count": 32, "id": "79e974fe", "metadata": { "test_replace": { "subset_size = 200": "subset_size = 4" } }, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "import datasets\n", "\n", "num_inference_steps = 4\n", "subset_size = 200\n", "\n", "ov_int8_unet_path = MODELS_DIR / 'unet_optimized.xml'\n", "ov_int8_controlnet_path = MODELS_DIR / 'controlnet_optimized.xml'\n", "\n", "num_samples = int(np.ceil(subset_size / num_inference_steps))\n", "dataset = datasets.load_dataset(\"wider_face\", split=\"train\", streaming=True).shuffle(seed=42)\n", "face_info = []\n", "for batch in dataset:\n", " try:\n", " face_info.append(get_face_info(batch[\"image\"]))\n", " except RuntimeError:\n", " continue\n", " if len(face_info) > num_samples:\n", " break" ] }, { "attachments": {}, "cell_type": "markdown", "id": "dd99e88a", "metadata": {}, "source": [ "\n", "To collect intermediate model inputs for calibration we should customize `CompiledModel`." ] }, { "cell_type": "code", "execution_count": 33, "id": "1052ff23", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "from tqdm.notebook import tqdm\n", "from transformers import set_seed\n", "\n", "set_seed(42)\n", "\n", "class CompiledModelDecorator(ov.CompiledModel):\n", " def __init__(self, compiled_model: ov.CompiledModel, keep_prob: float = 1.0):\n", " super().__init__(compiled_model)\n", " self.data_cache = []\n", " self.keep_prob = np.clip(keep_prob, 0, 1)\n", "\n", " def __call__(self, *args, **kwargs):\n", " if np.random.rand() <= self.keep_prob:\n", " self.data_cache.append(*args)\n", " return super().__call__(*args, **kwargs)\n", "\n", "\n", "def collect_calibration_data(pipeline, face_info, subset_size):\n", " original_unet = pipeline.unet\n", " pipeline.unet = CompiledModelDecorator(original_unet)\n", " pipeline.set_progress_bar_config(disable=True)\n", "\n", " pbar = tqdm(total=subset_size)\n", " for face_emb, face_kps in face_info:\n", " negative_prompt = np.random.choice(negative_prompts)\n", " prompt = np.random.choice(prompts)\n", " _ = pipeline(\n", " prompt,\n", " image_embeds=face_emb,\n", " image=face_kps,\n", " num_inference_steps=num_inference_steps,\n", " negative_prompt=negative_prompt,\n", " guidance_scale=0.5,\n", " generator=torch.Generator(device=\"cpu\").manual_seed(1749781188)\n", " )\n", " collected_subset_size = len(pipeline.unet.data_cache)\n", " pbar.update(collected_subset_size - pbar.n)\n", "\n", " calibration_dataset = pipeline.unet.data_cache[:subset_size]\n", " pipeline.set_progress_bar_config(disable=False)\n", " pipeline.unet = original_unet\n", " return calibration_dataset\n" ] }, { "cell_type": "code", "execution_count": null, "id": "8d032211", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "if not (ov_int8_unet_path.exists() and ov_int8_controlnet_path.exists()):\n", " unet_calibration_data = collect_calibration_data(ov_pipe, face_info, subset_size=subset_size)" ] }, { "cell_type": "code", "execution_count": 35, "id": "fa997281", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "def prepare_controlnet_dataset(pipeline, face_info, unet_calibration_data):\n", " controlnet_calibration_data = []\n", " i = 0\n", " for face_emb, face_kps in face_info:\n", " prompt_image_emb = pipeline._encode_prompt_image_emb(\n", " face_emb, num_images_per_prompt=1, do_classifier_free_guidance=False\n", " )\n", " image = pipeline.prepare_image(\n", " image=face_kps,\n", " width=None,\n", " height=None,\n", " batch_size=1,\n", " num_images_per_prompt=1,\n", " do_classifier_free_guidance=False,\n", " guess_mode=False,\n", " )\n", " for data in unet_calibration_data[i:i+num_inference_steps]:\n", " controlnet_inputs = [data[0], data[1], prompt_image_emb, image, 1.0, data[-2], data[-1]]\n", " controlnet_calibration_data.append(controlnet_inputs)\n", " i += num_inference_steps\n", " return controlnet_calibration_data\n" ] }, { "cell_type": "code", "execution_count": 36, "id": "0f69732b", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "if not ov_int8_controlnet_path.exists():\n", " controlnet_calibration_data = prepare_controlnet_dataset(ov_pipe, face_info, unet_calibration_data)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6b262146", "metadata": {}, "source": [ "### Run Quantization\n", "[back to top ⬆️](#Table-of-contents:)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f49bb083", "metadata": {}, "source": [ "#### Run ControlNet Quantization\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Quantization of the first `Convolution` layer impacts the generation results. We recommend using `IgnoredScope` to keep accuracy sensitive layers in FP16 precision." ] }, { "cell_type": "code", "execution_count": null, "id": "e8a4f0ac", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "import nncf\n", "\n", "if not ov_int8_controlnet_path.exists():\n", " controlnet = core.read_model(ov_controlnet_path)\n", " quantized_controlnet = nncf.quantize(\n", " model=controlnet,\n", " calibration_dataset=nncf.Dataset(controlnet_calibration_data),\n", " subset_size=subset_size,\n", " ignored_scope=nncf.IgnoredScope(names=[\"__module.model.conv_in/aten::_convolution/Convolution\"]),\n", " model_type=nncf.ModelType.TRANSFORMER,\n", " )\n", " ov.save_model(quantized_controlnet, ov_int8_controlnet_path)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b3ce79be", "metadata": {}, "source": [ "#### Run UNet Hybrid Quantization\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "On the one hand, post-training quantization of the UNet model requires more than ~100Gb and leads to accuracy drop. On the other hand, the weight compression doesn't improve performance when applying to Stable Diffusion models, because the size of activations is comparable to weights. That is why the proposal is to apply quantization in hybrid mode which means that we quantize: (1) weights of MatMul and Embedding layers and (2) activations of other layers. The steps are the following:\n", "\n", "1. Create a calibration dataset for quantization.\n", "2. Collect operations with weights.\n", "3. Run `nncf.compress_model()` to compress only the model weights.\n", "4. Run `nncf.quantize()` on the compressed model with weighted operations ignored by providing `ignored_scope` parameter.\n", "5. Save the `INT8` model using `openvino.save_model()` function." ] }, { "cell_type": "code", "execution_count": 38, "id": "977a1fc0", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "from collections import deque\n", "\n", "def get_operation_const_op(operation, const_port_id: int):\n", " node = operation.input_value(const_port_id).get_node()\n", " queue = deque([node])\n", " constant_node = None\n", " allowed_propagation_types_list = [\"Convert\", \"FakeQuantize\", \"Reshape\"]\n", "\n", " while len(queue) != 0:\n", " curr_node = queue.popleft()\n", " if curr_node.get_type_name() == \"Constant\":\n", " constant_node = curr_node\n", " break\n", " if len(curr_node.inputs()) == 0:\n", " break\n", " if curr_node.get_type_name() in allowed_propagation_types_list:\n", " queue.append(curr_node.input_value(0).get_node())\n", "\n", " return constant_node\n", "\n", "\n", "def is_embedding(node) -> bool:\n", " allowed_types_list = [\"f16\", \"f32\", \"f64\"]\n", " const_port_id = 0\n", " input_tensor = node.input_value(const_port_id)\n", " if input_tensor.get_element_type().get_type_name() in allowed_types_list:\n", " const_node = get_operation_const_op(node, const_port_id)\n", " if const_node is not None:\n", " return True\n", "\n", " return False\n", "\n", "\n", "def collect_ops_with_weights(model):\n", " ops_with_weights = []\n", " for op in model.get_ops():\n", " if op.get_type_name() == \"MatMul\":\n", " constant_node_0 = get_operation_const_op(op, const_port_id=0)\n", " constant_node_1 = get_operation_const_op(op, const_port_id=1)\n", " if constant_node_0 or constant_node_1:\n", " ops_with_weights.append(op.get_friendly_name())\n", " if op.get_type_name() == \"Gather\" and is_embedding(op):\n", " ops_with_weights.append(op.get_friendly_name())\n", "\n", " return ops_with_weights" ] }, { "cell_type": "code", "execution_count": null, "id": "48720e95", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "if not ov_int8_unet_path.exists():\n", " unet = core.read_model(ov_unet_path)\n", " unet_ignored_scope = collect_ops_with_weights(unet)\n", " compressed_unet = nncf.compress_weights(unet, ignored_scope=nncf.IgnoredScope(types=['Convolution']))\n", " quantized_unet = nncf.quantize(\n", " model=compressed_unet,\n", " calibration_dataset=nncf.Dataset(unet_calibration_data),\n", " subset_size=subset_size,\n", " model_type=nncf.ModelType.TRANSFORMER,\n", " ignored_scope=nncf.IgnoredScope(names=unet_ignored_scope),\n", " advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alpha=-1)\n", " )\n", " ov.save_model(quantized_unet, ov_int8_unet_path)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "44c4fc1b", "metadata": {}, "source": [ "#### Run Weights Compression\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Quantizing of the `Text Encoders` and `VAE Decoder` does not significantly improve inference performance but can lead to a substantial degradation of accuracy. The weight compression will be applied to footprint reduction." ] }, { "cell_type": "code", "execution_count": 40, "id": "eef92c31", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "ov_int8_text_encoder_path = MODELS_DIR / 'text_encoder_optimized.xml'\n", "ov_int8_text_encoder_2_path = MODELS_DIR / 'text_encoder_2_optimized.xml'\n", "ov_int8_vae_decoder_path = MODELS_DIR / 'vae_decoder_optimized.xml'\n", "\n", "if not ov_int8_text_encoder_path.exists():\n", " text_encoder = core.read_model(ov_text_encoder_path)\n", " compressed_text_encoder = nncf.compress_weights(text_encoder)\n", " ov.save_model(compressed_text_encoder, ov_int8_text_encoder_path)\n", "\n", "if not ov_int8_text_encoder_2_path.exists():\n", " text_encoder_2 = core.read_model(ov_text_encoder_2_path)\n", " compressed_text_encoder_2 = nncf.compress_weights(text_encoder_2)\n", " ov.save_model(compressed_text_encoder_2, ov_int8_text_encoder_2_path)\n", "\n", "if not ov_int8_vae_decoder_path.exists():\n", " vae_decoder = core.read_model(ov_vae_decoder_path)\n", " compressed_vae_decoder = nncf.compress_weights(vae_decoder)\n", " ov.save_model(compressed_vae_decoder, ov_int8_vae_decoder_path)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9d61a276", "metadata": {}, "source": [ "Let's compare the images generated by the original and optimized pipelines." ] }, { "cell_type": "code", "execution_count": 41, "id": "a6b6b594", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "optimized_controlnet = core.compile_model(ov_int8_controlnet_path, device.value)\n", "optimized_unet = core.compile_model(ov_int8_unet_path, device.value)\n", "optimized_text_encoder = core.compile_model(ov_int8_text_encoder_path, device.value)\n", "optimized_text_encoder_2 = core.compile_model(ov_int8_text_encoder_2_path, device.value)\n", "optimized_vae_decoder = core.compile_model(ov_int8_vae_decoder_path, device.value)\n", "\n", "int8_pipe = OVStableDiffusionXLInstantIDPipeline(\n", " optimized_text_encoder,\n", " optimized_text_encoder_2,\n", " image_proj_model,\n", " optimized_controlnet,\n", " optimized_unet,\n", " optimized_vae_decoder,\n", " tokenizer,\n", " tokenizer_2,\n", " scheduler,\n", ")" ] }, { "cell_type": "code", "execution_count": 42, "id": "9b24c888", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "50f32fa601ab45369e73f446b72e72ac", "version_major": 2, "version_minor": 0 }, "text/plain": [ " 0%| | 0/4 [00:00" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%skip not $to_quantize.value\n", "\n", "visualize_results(image, int8_image)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a73d5698", "metadata": {}, "source": [ "### Compare model file sizes\n", "[back to top ⬆️](#Table-of-contents:)" ] }, { "cell_type": "code", "execution_count": null, "id": "e4b014e2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "unet compression rate: 1.996\n", "controlnet compression rate: 1.995\n", "text_encoder compression rate: 1.992\n", "text_encoder_2 compression rate: 1.995\n", "vae_decoder compression rate: 1.997\n" ] } ], "source": [ "%%skip not $to_quantize.value\n", "\n", "fp16_model_paths = [ov_unet_path, ov_controlnet_path, ov_text_encoder_path, ov_text_encoder_2_path, ov_vae_decoder_path]\n", "int8_model_paths = [ov_int8_unet_path, ov_int8_controlnet_path, ov_int8_text_encoder_path, ov_int8_text_encoder_2_path, ov_int8_vae_decoder_path]\n", "\n", "for fp16_path, int8_path in zip(fp16_model_paths, int8_model_paths):\n", " fp16_ir_model_size = fp16_path.with_suffix(\".bin\").stat().st_size\n", " int8_model_size = int8_path.with_suffix(\".bin\").stat().st_size\n", " print(f\"{fp16_path.stem} compression rate: {fp16_ir_model_size / int8_model_size:.3f}\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8f1298f2", "metadata": {}, "source": [ "### Compare inference time of the FP16 and INT8 pipelines\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "To measure the inference performance of the `FP16` and `INT8` pipelines, we use mean inference time on 5 samples.\n", "\n", "> **NOTE**: For the most accurate performance estimation, it is recommended to run `benchmark_app` in a terminal/command prompt after closing other applications." ] }, { "cell_type": "code", "execution_count": null, "id": "752130d7", "metadata": {}, "outputs": [], "source": [ "%%skip not $to_quantize.value\n", "\n", "import time\n", "\n", "def calculate_inference_time(pipeline, face_info):\n", " inference_time = []\n", " pipeline.set_progress_bar_config(disable=True)\n", " for i in range(5):\n", " face_emb, face_kps = face_info[i]\n", " prompt = np.random.choice(prompts)\n", " negative_prompt = np.random.choice(negative_prompts)\n", " start = time.perf_counter()\n", " _ = pipeline(\n", " prompt,\n", " image_embeds=face_emb,\n", " image=face_kps,\n", " num_inference_steps=4,\n", " negative_prompt=negative_prompt,\n", " guidance_scale=0.5,\n", " generator=torch.Generator(device=\"cpu\").manual_seed(1749781188)\n", " )\n", " end = time.perf_counter()\n", " delta = end - start\n", " inference_time.append(delta)\n", " pipeline.set_progress_bar_config(disable=False)\n", " return np.mean(inference_time)" ] }, { "cell_type": "code", "execution_count": null, "id": "1ce11624", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "FP16 pipeline: 17.595 seconds\n", "INT8 pipeline: 15.258 seconds\n", "Performance speed-up: 1.153\n" ] } ], "source": [ "%%skip not $to_quantize.value\n", "\n", "fp_latency = calculate_inference_time(ov_pipe, face_info)\n", "print(f\"FP16 pipeline: {fp_latency:.3f} seconds\")\n", "int8_latency = calculate_inference_time(int8_pipe, face_info)\n", "print(f\"INT8 pipeline: {int8_latency:.3f} seconds\")\n", "print(f\"Performance speed-up: {fp_latency / int8_latency:.3f}\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "29c60b3d-c990-494c-89c3-9e0beaeabc33", "metadata": {}, "source": [ "## Interactive demo\n", "[back to top ⬆️](#Table-of-contents:)\n", "\n", "Please select below whether you would like to use the quantized models to launch the interactive demo." ] }, { "cell_type": "code", "execution_count": null, "id": "ef7d4415", "metadata": {}, "outputs": [], "source": [ "quantized_models_present = int8_pipe is not None\n", "\n", "use_quantized_models = widgets.Checkbox(\n", " value=quantized_models_present,\n", " description=\"Use quantized models\",\n", " disabled=not quantized_models_present,\n", ")\n", "\n", "use_quantized_models" ] }, { "cell_type": "code", "execution_count": null, "id": "63adc8fe-1965-4458-aa5d-a9a6ac676801", "metadata": {}, "outputs": [], "source": [ "import gradio as gr\n", "from typing import Tuple\n", "import random\n", "import PIL\n", "import sys\n", "\n", "sys.path.append(\"./InstantID/gradio_demo\")\n", "\n", "from style_template import styles\n", "\n", "# global variable\n", "MAX_SEED = np.iinfo(np.int32).max\n", "STYLE_NAMES = list(styles.keys())\n", "DEFAULT_STYLE_NAME = \"Watercolor\"\n", "\n", "\n", "example_image_urls = [\n", " \"https://huggingface.co/datasets/EnD-Diffusers/AI_Faces/resolve/main/00002-3104853212.png\",\n", " \"https://huggingface.co/datasets/EnD-Diffusers/AI_Faces/resolve/main/images%207/00171-2728008415.png\",\n", " \"https://huggingface.co/datasets/EnD-Diffusers/AI_Faces/resolve/main/00003-3962843561.png\",\n", " \"https://huggingface.co/datasets/EnD-Diffusers/AI_Faces/resolve/main/00005-3104853215.png\",\n", " \"https://huggingface.co/datasets/YiYiXu/testing-images/resolve/main/ai_face2.png\",\n", "]\n", "\n", "examples_dir = Path(\"examples\")\n", "\n", "examples = [\n", " [examples_dir / \"face_0.png\", \"A woman in red dress\", \"Film Noir\", \"\"],\n", " [examples_dir / \"face_1.png\", \"photo of a business lady\", \"Vibrant Color\", \"\"],\n", " [examples_dir / \"face_2.png\", \"famous rock star poster\", \"(No style)\", \"\"],\n", " [examples_dir / \"face_3.png\", \"a person\", \"Neon\", \"\"],\n", " [examples_dir / \"face_4.png\", \"a girl\", \"Snow\", \"\"],\n", "]\n", "\n", "pipeline = int8_pipe if use_quantized_models.value else ov_pipe\n", "\n", "\n", "if not examples_dir.exists():\n", " examples_dir.mkdir()\n", " for img_id, img_url in enumerate(example_image_urls):\n", " load_image(img_url).save(examples_dir / f\"face_{img_id}.png\")\n", "\n", "\n", "def randomize_seed_fn(seed: int, randomize_seed: bool) -> int:\n", " if randomize_seed:\n", " seed = random.randint(0, MAX_SEED)\n", " return seed\n", "\n", "\n", "def convert_from_cv2_to_image(img: np.ndarray) -> PIL.Image:\n", " return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))\n", "\n", "\n", "def convert_from_image_to_cv2(img: PIL.Image) -> np.ndarray:\n", " return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)\n", "\n", "\n", "def resize_img(\n", " input_image,\n", " max_side=1024,\n", " min_side=800,\n", " size=None,\n", " pad_to_max_side=False,\n", " mode=PIL.Image.BILINEAR,\n", " base_pixel_number=64,\n", "):\n", " w, h = input_image.size\n", " if size is not None:\n", " w_resize_new, h_resize_new = size\n", " else:\n", " ratio = min_side / min(h, w)\n", " w, h = round(ratio * w), round(ratio * h)\n", " ratio = max_side / max(h, w)\n", " input_image = input_image.resize([round(ratio * w), round(ratio * h)], mode)\n", " w_resize_new = (round(ratio * w) // base_pixel_number) * base_pixel_number\n", " h_resize_new = (round(ratio * h) // base_pixel_number) * base_pixel_number\n", " input_image = input_image.resize([w_resize_new, h_resize_new], mode)\n", "\n", " if pad_to_max_side:\n", " res = np.ones([max_side, max_side, 3], dtype=np.uint8) * 255\n", " offset_x = (max_side - w_resize_new) // 2\n", " offset_y = (max_side - h_resize_new) // 2\n", " res[offset_y : offset_y + h_resize_new, offset_x : offset_x + w_resize_new] = np.array(input_image)\n", " input_image = Image.fromarray(res)\n", " return input_image\n", "\n", "\n", "def apply_style(style_name: str, positive: str, negative: str = \"\") -> Tuple[str, str]:\n", " p, n = styles.get(style_name, styles[DEFAULT_STYLE_NAME])\n", " return p.replace(\"{prompt}\", positive), n + \" \" + negative\n", "\n", "\n", "def generate_image(\n", " face_image,\n", " pose_image,\n", " prompt,\n", " negative_prompt,\n", " style_name,\n", " num_steps,\n", " identitynet_strength_ratio,\n", " guidance_scale,\n", " seed,\n", " progress=gr.Progress(track_tqdm=True),\n", "):\n", " if prompt is None:\n", " prompt = \"a person\"\n", "\n", " # apply the style template\n", " prompt, negative_prompt = apply_style(style_name, prompt, negative_prompt)\n", "\n", " # face_image = load_image(face_image_path)\n", " face_image = resize_img(face_image)\n", " face_image_cv2 = convert_from_image_to_cv2(face_image)\n", " height, width, _ = face_image_cv2.shape\n", "\n", " # Extract face features\n", " face_info = app.get(face_image_cv2)\n", "\n", " if len(face_info) == 0:\n", " raise gr.Error(\"Cannot find any face in the image! Please upload another person image\")\n", "\n", " face_info = sorted(\n", " face_info,\n", " key=lambda x: (x[\"bbox\"][2] - x[\"bbox\"][0]) * x[\"bbox\"][3] - x[\"bbox\"][1],\n", " )[\n", " -1\n", " ] # only use the maximum face\n", " face_emb = face_info[\"embedding\"]\n", " face_kps = draw_kps(convert_from_cv2_to_image(face_image_cv2), face_info[\"kps\"])\n", "\n", " if pose_image is not None:\n", " # pose_image = load_image(pose_image_path)\n", " pose_image = resize_img(pose_image)\n", " pose_image_cv2 = convert_from_image_to_cv2(pose_image)\n", "\n", " face_info = app.get(pose_image_cv2)\n", "\n", " if len(face_info) == 0:\n", " raise gr.Error(\"Cannot find any face in the reference image! Please upload another person image\")\n", "\n", " face_info = face_info[-1]\n", " face_kps = draw_kps(pose_image, face_info[\"kps\"])\n", "\n", " width, height = face_kps.size\n", "\n", " generator = torch.Generator(device=\"cpu\").manual_seed(seed)\n", "\n", " print(\"Start inference...\")\n", " print(f\"[Debug] Prompt: {prompt}, \\n[Debug] Neg Prompt: {negative_prompt}\")\n", " images = pipeline(\n", " prompt=prompt,\n", " negative_prompt=negative_prompt,\n", " image_embeds=face_emb,\n", " image=face_kps,\n", " controlnet_conditioning_scale=float(identitynet_strength_ratio),\n", " num_inference_steps=num_steps,\n", " guidance_scale=guidance_scale,\n", " height=height,\n", " width=width,\n", " generator=generator,\n", " ).images\n", "\n", " return images[0]\n", "\n", "\n", "### Description\n", "title = r\"\"\"\n", "

InstantID: Zero-shot Identity-Preserving Generation

\n", "\"\"\"\n", "\n", "description = r\"\"\"\n", "\n", " How to use:
\n", " 1. Upload an image with a face. For images with multiple faces, we will only detect the largest face. Ensure the face is not too small and is clearly visible without significant obstructions or blurring.\n", " 2. (Optional) You can upload another image as a reference for the face pose. If you don't, we will use the first detected face image to extract facial landmarks. If you use a cropped face at step 1, it is recommended to upload it to define a new face pose.\n", " 3. Enter a text prompt, as done in normal text-to-image models.\n", " 4. Click the Submit button to begin customization.\n", " 5. Share your customized photo with your friends and enjoy! 😊\n", " \"\"\"\n", "\n", "\n", "css = \"\"\"\n", " .gradio-container {width: 85% !important}\n", " \"\"\"\n", "with gr.Blocks(css=css) as demo:\n", " # description\n", " gr.Markdown(title)\n", " gr.Markdown(description)\n", "\n", " with gr.Row():\n", " with gr.Column():\n", " # upload face image\n", " face_file = gr.Image(label=\"Upload a photo of your face\", type=\"pil\")\n", "\n", " # optional: upload a reference pose image\n", " pose_file = gr.Image(label=\"Upload a reference pose image (optional)\", type=\"pil\")\n", "\n", " # prompt\n", " prompt = gr.Textbox(\n", " label=\"Prompt\",\n", " info=\"Give simple prompt is enough to achieve good face fidelity\",\n", " placeholder=\"A photo of a person\",\n", " value=\"\",\n", " )\n", "\n", " submit = gr.Button(\"Submit\", variant=\"primary\")\n", " style = gr.Dropdown(label=\"Style template\", choices=STYLE_NAMES, value=DEFAULT_STYLE_NAME)\n", "\n", " # strength\n", " identitynet_strength_ratio = gr.Slider(\n", " label=\"IdentityNet strength (for fidelity)\",\n", " minimum=0,\n", " maximum=1.5,\n", " step=0.05,\n", " value=0.80,\n", " )\n", "\n", " with gr.Accordion(open=False, label=\"Advanced Options\"):\n", " negative_prompt = gr.Textbox(\n", " label=\"Negative Prompt\",\n", " placeholder=\"low quality\",\n", " value=\"(lowres, low quality, worst quality:1.2), (text:1.2), watermark, (frame:1.2), deformed, ugly, deformed eyes, blur, out of focus, blurry, deformed cat, deformed, photo, anthropomorphic cat, monochrome, pet collar, gun, weapon, blue, 3d, drones, drone, buildings in background, green\",\n", " )\n", " num_steps = gr.Slider(\n", " label=\"Number of sample steps\",\n", " minimum=1,\n", " maximum=10,\n", " step=1,\n", " value=4,\n", " )\n", " guidance_scale = gr.Slider(label=\"Guidance scale\", minimum=0.1, maximum=10.0, step=0.1, value=0)\n", " seed = gr.Slider(\n", " label=\"Seed\",\n", " minimum=0,\n", " maximum=MAX_SEED,\n", " step=1,\n", " value=42,\n", " )\n", " randomize_seed = gr.Checkbox(label=\"Randomize seed\", value=True)\n", " gr.Examples(\n", " examples=examples,\n", " inputs=[face_file, prompt, style, negative_prompt],\n", " )\n", "\n", " with gr.Column():\n", " gallery = gr.Image(label=\"Generated Image\")\n", "\n", " submit.click(\n", " fn=randomize_seed_fn,\n", " inputs=[seed, randomize_seed],\n", " outputs=seed,\n", " api_name=False,\n", " ).then(\n", " fn=generate_image,\n", " inputs=[\n", " face_file,\n", " pose_file,\n", " prompt,\n", " negative_prompt,\n", " style,\n", " num_steps,\n", " identitynet_strength_ratio,\n", " guidance_scale,\n", " seed,\n", " ],\n", " outputs=[gallery],\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "89d169ba-369d-49d5-b89e-6eb38079ae1a", "metadata": {}, "outputs": [], "source": [ "if __name__ == \"__main__\":\n", " try:\n", " demo.launch(debug=True)\n", " except Exception:\n", " demo.launch(share=True, debug=True)\n", "# if you are launching remotely, specify server_name and server_port\n", "# demo.launch(server_name='your server name', server_port='server port in int')\n", "# Read more in the docs: https://gradio.app/docs/" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "openvino_notebooks": { "imageUrl": "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/79d9d244-8c65-4564-9baa-80a73c4674bf", "tags": { "categories": [ "Model Demos", "AI Trends" ], "libraries": [], "other": [ "Stable Diffusion" ], "tasks": [ "Image-to-Image", "Text-to-Image" ] } }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }