File size: 3,416 Bytes
af69776
9a42ebe
 
 
 
 
 
8c9844e
128787b
9a42ebe
069cb3b
 
9a42ebe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af69776
 
9a42ebe
af69776
9a42ebe
069cb3b
 
 
af69776
 
9a42ebe
069cb3b
 
 
 
 
 
 
 
9a42ebe
af69776
 
 
069cb3b
 
af69776
069cb3b
af69776
 
 
43a59c2
af69776
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from typing import Dict, Any, List
from PIL import Image
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
from transformers.image_utils import to_numpy_array, PILImageResampling, ChannelDimension
from transformers.image_transforms import resize, to_channel_dimension_format

class EndpointHandler:
    def __init__(self, model_path: str):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.processor = AutoProcessor.from_pretrained(model_path)
        self.model = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code=True, torch_dtype=torch.bfloat16).to(self.device)
        self.image_seq_len = self.model.config.perceiver_config.resampler_n_latents
        self.bos_token = self.processor.tokenizer.bos_token
        self.bad_words_ids = self.processor.tokenizer(["<image>", "<fake_token_around_image>"], add_special_tokens=False).input_ids

    def convert_to_rgb(self, image: Image.Image) -> Image.Image:
        if image.mode == "RGB":
            return image
        image_rgba = image.convert("RGBA")
        background = Image.new("RGBA", image_rgba.size, (255, 255, 255))
        alpha_composite = Image.alpha_composite(background, image_rgba)
        alpha_composite = alpha_composite.convert("RGB")
        return alpha_composite

    def custom_transform(self, image: Image.Image) -> torch.Tensor:
        image = self.convert_to_rgb(image)
        image = to_numpy_array(image)
        image = resize(image, (960, 960), resample=PILImageResampling.BILINEAR)
        image = self.processor.image_processor.rescale(image, scale=1 / 255)
        image = self.processor.image_processor.normalize(
            image,
            mean=self.processor.image_processor.image_mean,
            std=self.processor.image_processor.image_std
        )
        image = to_channel_dimension_format(image, ChannelDimension.FIRST)
        return torch.tensor(image)

    def generate_responses(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
        results = []
        image = data.get("inputs")

        if isinstance(image, str):
            try:
                image = Image.open(image)
            except Exception as e:
                results.append({"error": f"Failed to open image: {e}"})
                return results

        try:
            inputs = self.processor.tokenizer(
                f"{self.bos_token}<fake_token_around_image>{'<image>' * self.image_seq_len}<fake_token_around_image>",
                return_tensors="pt",
                add_special_tokens=False,
            )
            inputs["pixel_values"] = self.processor.image_processor([image], transform=self.custom_transform)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            generated_ids = self.model.generate(**inputs, bad_words_ids=self.bad_words_ids, max_length=2048, early_stopping=True)
            generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
            results.append({"label": generated_text, "score": 1.0})
        
        except torch.cuda.CudaError as e:
            results.append({"error": f"CUDA error: {e}"})
        except Exception as e:
            results.append({"error": f"Unexpected error: {e}"})

        return results

    def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
        return self.generate_responses(data)