from typing import Dict, List, Any from transformers import Qwen2VLForConditionalGeneration, AutoProcessor from modelscope import snapshot_download from qwen_vl_utils import process_vision_info import torch import os import base64 import io from PIL import Image import ffmpeg import logging import requests class EndpointHandler(): def __init__(self, path=""): self.model_dir = path self.model = Qwen2VLForConditionalGeneration.from_pretrained( self.model_dir, torch_dtype="auto", device_map="auto" ) self.processor = AutoProcessor.from_pretrained(self.model_dir) def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """ data args: inputs (str): The input text, including any image or video references. max_new_tokens (int, optional): Maximum number of tokens to generate. Defaults to 128. Return: A dictionary containing the generated text. """ inputs = data.get("inputs") max_new_tokens = data.get("max_new_tokens", 128) # Construct the messages list from the input string messages = [{"role": "user", "content": self._parse_input(inputs)}] text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = self.processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu") generated_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens) generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] output_text = self.processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] # Return a single string return {"generated_text": output_text} def _parse_input(self, input_string): """Parses the input string to identify image/video references and text.""" content = [] parts = input_string.split("") for i, part in enumerate(parts): if i % 2 == 0: # Text part content.append({"type": "text", "text": part.strip()}) else: # Image/video part if part.startswith("video:"): video_path = part.split("video:")[1].strip() video_frames = self._extract_video_frames(video_path) if video_frames: content.append({"type": "video", "video": video_frames, "fps": 1}) # Add fps else: image = self._load_image(part.strip()) if image: content.append({"type": "image", "image": image}) return content def _load_image(self, image_data): """Loads an image from a URL or base64 encoded string.""" if image_data.startswith("http"): try: image = Image.open(requests.get(image_data, stream=True).raw) except Exception as e: logging.error(f"Error loading image from URL: {e}") return None elif image_data.startswith("data:image"): try: image_data = image_data.split(",")[1] image_bytes = base64.b64decode(image_data) image = Image.open(io.BytesIO(image_bytes)) except Exception as e: logging.error(f"Error loading image from base64: {e}") return None else: logging.error("Invalid image data format. Must be URL or base64 encoded.") return None return image def _extract_video_frames(self, video_path, fps=1): """Extracts frames from a video at the specified FPS.""" try: probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if not video_stream: logging.error(f"No video stream found in {video_path}") return None width = int(video_stream['width']) height = int(video_stream['height']) out, _ = ( ffmpeg .input(video_path) .filter('fps', fps=fps) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .run(capture_stdout=True) ) frames = [] for i in range(0, len(out), width * height * 3): frame_data = out[i:i + width * height * 3] frame = Image.frombytes('RGB', (width, height), frame_data) frames.append(frame) return frames except ffmpeg.Error as e: logging.error(f"Error extracting video frames: {e}") return None