import torch from PIL import Image from conversation import conv_templates from builder import load_pretrained_model from functools import partial from typing import Optional, Callable import ast import math import numpy as np DEFAULT_REGION_FEA_TOKEN = "" DEFAULT_IMAGE_TOKEN = "" DEFAULT_IM_START_TOKEN = "" DEFAULT_IM_END_TOKEN = "" VOCAB_IMAGE_W = 1000 # 224 VOCAB_IMAGE_H = 1000 # 224 IMAGE_TOKEN_INDEX = -200 # define the task categories box_in_tasks = ['widgetcaptions', 'taperception', 'ocr', 'icon_recognition', 'widget_classification', 'example_0'] box_out_tasks = ['widget_listing', 'find_text', 'find_icons', 'find_widget', 'conversation_interaction'] no_box_tasks = ['screen2words', 'detailed_description', 'conversation_perception', 'gpt4'] def get_bbox_coor(box, ratio_w, ratio_h): return box[0] * ratio_w, box[1] * ratio_h, box[2] * ratio_w, box[3] * ratio_h def tokenizer_image_token(prompt, tokenizer, image_token_index=IMAGE_TOKEN_INDEX, return_tensors=None): if '' in prompt: prompt_chunks = [tokenizer(chunk).input_ids for chunk in prompt.split('')] input_ids = [] for i, chunk in enumerate(prompt_chunks): input_ids.extend(chunk) if i < len(prompt_chunks) - 1: input_ids.append(image_token_index) else: input_ids = tokenizer(prompt).input_ids # if return_tensors == 'pt': # import torch # input_ids = torch.tensor(input_ids).unsqueeze(0) return input_ids def expand2square(pil_img, background_color): width, height = pil_img.size if width == height: return pil_img elif width > height: result = Image.new(pil_img.mode, (width, width), background_color) result.paste(pil_img, (0, (width - height) // 2)) return result else: result = Image.new(pil_img.mode, (height, height), background_color) result.paste(pil_img, ((height - width) // 2, 0)) return result def select_best_resolution(original_size, possible_resolutions): """ Selects the best resolution from a list of possible resolutions based on the original size. Args: original_size (tuple): The original size of the image in the format (width, height). possible_resolutions (list): A list of possible resolutions in the format [(width1, height1), (width2, height2), ...]. Returns: tuple: The best fit resolution in the format (width, height). """ original_width, original_height = original_size best_fit = None max_effective_resolution = 0 min_wasted_resolution = float('inf') for width, height in possible_resolutions: scale = min(width / original_width, height / original_height) downscaled_width, downscaled_height = int(original_width * scale), int(original_height * scale) effective_resolution = min(downscaled_width * downscaled_height, original_width * original_height) wasted_resolution = (width * height) - effective_resolution if effective_resolution > max_effective_resolution or (effective_resolution == max_effective_resolution and wasted_resolution < min_wasted_resolution): max_effective_resolution = effective_resolution min_wasted_resolution = wasted_resolution best_fit = (width, height) return best_fit def divide_to_patches(image, patch_size): """ Divides an image into patches of a specified size. Args: image (PIL.Image.Image): The input image. patch_size (int): The size of each patch. Returns: list: A list of PIL.Image.Image objects representing the patches. """ patches = [] width, height = image.size for i in range(0, height, patch_size): for j in range(0, width, patch_size): box = (j, i, j + patch_size, i + patch_size) patch = image.crop(box) patches.append(patch) return patches def resize_and_pad_image(image, target_resolution, is_pad=False): """ Resize and pad an image to a target resolution while maintaining aspect ratio. Args: image (PIL.Image.Image): The input image. target_resolution (tuple): The target resolution (width, height) of the image. Returns: PIL.Image.Image: The resized and padded image. """ original_width, original_height = image.size target_width, target_height = target_resolution if is_pad: scale_w = target_width / original_width scale_h = target_height / original_height if scale_w < scale_h: new_width = target_width new_height = min(math.ceil(original_height * scale_w), target_height) else: new_height = target_height new_width = min(math.ceil(original_width * scale_h), target_width) # Resize the image resized_image = image.resize((new_width, new_height)) new_image = Image.new('RGB', (target_width, target_height), (0, 0, 0)) paste_x = (target_width - new_width) // 2 paste_y = (target_height - new_height) // 2 new_image.paste(resized_image, (paste_x, paste_y)) else: new_image = image.resize((target_width, target_height)) return new_image def process_anyres_image(image, processor, grid_pinpoints, image_process_func: Optional[Callable] = None): """ Process an image with variable resolutions. Args: image (PIL.Image.Image): The input image to be processed. processor: The image processor object. grid_pinpoints (str): A string representation of a list of possible resolutions. Returns: torch.Tensor: A tensor containing the processed image patches. """ if type(grid_pinpoints) is list: possible_resolutions = grid_pinpoints else: possible_resolutions = ast.literal_eval(grid_pinpoints) best_resolution = select_best_resolution(image.size, possible_resolutions) # FIXME: not sure if do_pad or undo_pad may affect the referring side image_padded = resize_and_pad_image(image, best_resolution, is_pad=False) patches = divide_to_patches(image_padded, processor.crop_size['height']) if image_process_func: resized_image_h, resized_image_w = image_process_func.keywords['size'] image_original_resize = image.resize((resized_image_w, resized_image_h)) image_patches = [image_original_resize] + patches image_patches = [image_process_func(image_patch)['pixel_values'][0] for image_patch in image_patches] else: image_original_resize = image.resize((processor.size['shortest_edge'], processor.size['shortest_edge'])) image_patches = [image_original_resize] + patches image_patches = [processor.preprocess(image_patch, return_tensors='pt')['pixel_values'][0] for image_patch in image_patches] return torch.stack(image_patches, dim=0) def process_images(images, image_processor, model_cfg, image_process_func: Optional[Callable] = None): image_aspect_ratio = getattr(model_cfg, "image_aspect_ratio", None) new_images = [] if image_aspect_ratio == 'pad': for image in images: image = expand2square(image, tuple(int(x*255) for x in image_processor.image_mean)) image = image_processor.preprocess(image, return_tensors='pt')['pixel_values'][0] new_images.append(image) elif image_aspect_ratio == "anyres": # image_processor(images, return_tensors='pt', do_resize=True, do_center_crop=False, size=[image_h, image_w])['pixel_values'] for image in images: image = process_anyres_image(image, image_processor, model_cfg.image_grid_pinpoints, image_process_func=image_process_func) new_images.append(image) else: return image_processor(images, return_tensors='pt')['pixel_values'] if all(x.shape == new_images[0].shape for x in new_images): new_images = torch.stack(new_images, dim=0) return new_images # function to generate the mask def generate_mask_for_feature(coor, raw_w, raw_h, mask=None): """ Generates a region mask based on provided coordinates. Handles both point and box input. """ if mask is not None: assert mask.shape[0] == raw_w and mask.shape[1] == raw_h coor_mask = np.zeros((raw_w, raw_h)) # if it's a point (2 coordinates) if len(coor) == 2: span = 5 # Define the span for the point x_min = max(0, coor[0] - span) x_max = min(raw_w, coor[0] + span + 1) y_min = max(0, coor[1] - span) y_max = min(raw_h, coor[1] + span + 1) coor_mask[int(x_min):int(x_max), int(y_min):int(y_max)] = 1 assert (coor_mask == 1).any(), f"coor: {coor}, raw_w: {raw_w}, raw_h: {raw_h}" # if it's a box (4 coordinates) elif len(coor) == 4: coor_mask[coor[0]:coor[2]+1, coor[1]:coor[3]+1] = 1 if mask is not None: coor_mask = coor_mask * mask # convert to torch tensor and ensure it contains non-zero values coor_mask = torch.from_numpy(coor_mask) assert len(coor_mask.nonzero()) != 0, "Generated mask is empty :(" return coor_mask def infer_single_prompt(image_path, prompt, model_path, region=None, model_name="ferret_llama", conv_mode="ferret_llama_3", add_region_feature=False): img = Image.open(image_path).convert('RGB') # this loads the model, image processor and tokenizer tokenizer, model, image_processor, context_len = load_pretrained_model(model_path, None, model_name) # define the image size required by clip image_size = {"height": 336, "width": 336} if "" in prompt: prompt = prompt.split('\n')[1] if model.config.mm_use_im_start_end: prompt = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + prompt else: prompt = DEFAULT_IMAGE_TOKEN + '\n' + prompt # generate the prompt per template requirement conv = conv_templates[conv_mode].copy() conv.append_message(conv.roles[0], prompt) conv.append_message(conv.roles[1], None) prompt_input = conv.get_prompt() input_ids = tokenizer(prompt_input, return_tensors='pt')['input_ids'].cuda() # raw_w, raw_h = img.size # check if shouldnt be width and height raw_w = image_size["width"] raw_h = image_size["height"] if model.config.image_aspect_ratio == "square_nocrop": image_tensor = image_processor.preprocess(img, return_tensors='pt', do_resize=True, do_center_crop=False, size=[raw_h, raw_w])['pixel_values'][0] elif model.config.image_aspect_ratio == "anyres": image_process_func = partial(image_processor.preprocess, return_tensors='pt', do_resize=True, do_center_crop=False, size=[raw_h, raw_h]) image_tensor = process_images([img], image_processor, model.config, image_process_func=image_process_func)[0] else: image_tensor = process_images([img], image_processor, model.config)[0] images = image_tensor.unsqueeze(0).to(torch.float16).cuda() # region mask logic (if region is provided) region_masks = None if add_region_feature and region is not None: # box_in is true raw_w, raw_h = img.size ratio_w = VOCAB_IMAGE_W * 1.0 / raw_w ratio_h = VOCAB_IMAGE_H * 1.0 / raw_h # preprocess the region box_x1, box_y1, box_x2, box_y2 = region box_x1_textvocab, box_y1_textvocab, box_x2_textvocab, box_y2_textvocab = get_bbox_coor(box=region, ratio_h=ratio_h, ratio_w=ratio_w) region_coordinate_raw = [box_x1, box_y1, box_x2, box_y2] region_masks = generate_mask_for_feature(region_coordinate_raw, raw_w, raw_h).unsqueeze(0).cuda().half() region_masks = [[region_mask_i.cuda().half() for region_mask_i in region_masks]] prompt_input = prompt_input.replace("", f"[{box_x1_textvocab}, {box_y1_textvocab}, {box_x2_textvocab}, {box_y2_textvocab}] {DEFAULT_REGION_FEA_TOKEN}") # tokenize prompt # input_ids = tokenizer(prompt_input, return_tensors='pt')['input_ids'].cuda() # generate model output with torch.inference_mode(): # Use region_masks in model's forward call model.orig_forward = model.forward model.forward = partial( model.orig_forward, region_masks=region_masks ) # explcit add of attention mask output_ids = model.generate( input_ids, images=images, max_new_tokens=1024, num_beams=1, region_masks=region_masks, # pass the region mask to the model image_sizes=[img.size] ) model.forward = model.orig_forward # we decode the output output_text = tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0] return output_text.strip() # We also define a task-specific inference function def infer_ui_task(image_path, prompt, model_path, task, region=None, add_region_feature=False): """ Handles task types: box_in_tasks, box_out_tasks, no_box_tasks. """ if region is not None: add_region_feature=True if task in box_in_tasks and region is None: raise ValueError(f"Task {task} requires a bounding box region.") if task in box_in_tasks: print(f"Processing {task} with bounding box region.") return infer_single_prompt(image_path, prompt, model_path, region, add_region_feature=add_region_feature) elif task in box_out_tasks: print(f"Processing {task} without bounding box region.") return infer_single_prompt(image_path, prompt, model_path) elif task in no_box_tasks: print(f"Processing {task} without image or bounding box.") return infer_single_prompt(image_path, prompt, model_path) else: raise ValueError(f"Unknown task type: {task}")