|
""" |
|
Based upon ImageCaptionLoader in LangChain version: langchain/document_loaders/image_captions.py |
|
But accepts preloaded model to avoid slowness in use and CUDA forking issues |
|
|
|
Loader that loads image captions |
|
By default, the loader utilizes the pre-trained image captioning model. |
|
https://huggingface.co/microsoft/Florence-2-base |
|
|
|
""" |
|
from typing import List, Union, Any, Tuple |
|
|
|
import requests |
|
from langchain.docstore.document import Document |
|
from langchain_community.document_loaders import ImageCaptionLoader |
|
|
|
from utils import get_device, NullContext, clear_torch_cache |
|
|
|
from importlib.metadata import distribution, PackageNotFoundError |
|
|
|
try: |
|
assert distribution('bitsandbytes') is not None |
|
have_bitsandbytes = True |
|
except (PackageNotFoundError, AssertionError): |
|
have_bitsandbytes = False |
|
|
|
|
|
from io import BytesIO |
|
from pathlib import Path |
|
from typing import Any, List, Tuple, Union |
|
|
|
import requests |
|
from langchain_core.documents import Document |
|
|
|
from langchain_community.document_loaders.base import BaseLoader |
|
|
|
|
|
class ImageCaptionLoader(BaseLoader): |
|
"""Load image captions. |
|
|
|
By default, the loader utilizes the pre-trained |
|
Salesforce BLIP image captioning model. |
|
https://huggingface.co/Salesforce/blip-image-captioning-base |
|
""" |
|
|
|
def __init__( |
|
self, |
|
images: Union[str, Path, bytes, List[Union[str, bytes, Path]]], |
|
caption_processor: str = "Salesforce/blip-image-captioning-base", |
|
caption_model: str = "Salesforce/blip-image-captioning-base", |
|
): |
|
"""Initialize with a list of image data (bytes) or file paths |
|
|
|
Args: |
|
images: Either a single image or a list of images. Accepts |
|
image data (bytes) or file paths to images. |
|
caption_processor: The name of the pre-trained BLIP processor. |
|
caption_model: The name of the pre-trained BLIP model. |
|
""" |
|
if isinstance(images, (str, Path, bytes)): |
|
self.images = [images] |
|
else: |
|
self.images = images |
|
|
|
self.caption_processor = caption_processor |
|
self.caption_model = caption_model |
|
|
|
def load(self) -> List[Document]: |
|
"""Load from a list of image data or file paths""" |
|
try: |
|
from transformers import BlipForConditionalGeneration, BlipProcessor |
|
except ImportError: |
|
raise ImportError( |
|
"`transformers` package not found, please install with " |
|
"`pip install transformers`." |
|
) |
|
|
|
processor = BlipProcessor.from_pretrained(self.caption_processor) |
|
model = BlipForConditionalGeneration.from_pretrained(self.caption_model) |
|
|
|
results = [] |
|
for image in self.images: |
|
caption, metadata = self._get_captions_and_metadata( |
|
model=model, processor=processor, image=image |
|
) |
|
doc = Document(page_content=caption, metadata=metadata) |
|
results.append(doc) |
|
|
|
return results |
|
|
|
def _get_captions_and_metadata( |
|
self, model: Any, processor: Any, image: Union[str, Path, bytes] |
|
) -> Tuple[str, dict]: |
|
"""Helper function for getting the captions and metadata of an image.""" |
|
try: |
|
from PIL import Image |
|
except ImportError: |
|
raise ImportError( |
|
"`PIL` package not found, please install with `pip install pillow`" |
|
) |
|
|
|
image_source = image |
|
|
|
try: |
|
if isinstance(image, bytes): |
|
image = Image.open(BytesIO(image)).convert("RGB") |
|
elif isinstance(image, str) and ( |
|
image.startswith("http://") or image.startswith("https://") |
|
): |
|
image = Image.open(requests.get(image, stream=True).raw).convert("RGB") |
|
else: |
|
image = Image.open(image).convert("RGB") |
|
except Exception: |
|
if isinstance(image_source, bytes): |
|
msg = "Could not get image data from bytes" |
|
else: |
|
msg = f"Could not get image data for {image_source}" |
|
raise ValueError(msg) |
|
|
|
inputs = processor(image, "an image of", return_tensors="pt") |
|
output = model.generate(**inputs) |
|
|
|
caption: str = processor.decode(output[0]) |
|
if isinstance(image_source, bytes): |
|
metadata: dict = {"image_source": "Image bytes provided"} |
|
else: |
|
metadata = {"image_path": str(image_source)} |
|
|
|
return caption, metadata |
|
|
|
|
|
class H2OImageCaptionLoader(ImageCaptionLoader): |
|
"""Loader that loads the captions of an image""" |
|
|
|
def __init__(self, path_images: Union[str, List[str]] = None, |
|
caption_processor: str = None, |
|
caption_model: str = None, |
|
caption_gpu=True, |
|
load_in_8bit=True, |
|
|
|
load_half=False, |
|
load_gptq='', |
|
load_awq='', |
|
load_exllama=False, |
|
use_safetensors=False, |
|
revision=None, |
|
min_new_tokens=512, |
|
max_tokens=50, |
|
gpu_id='auto'): |
|
if caption_model is None or caption_model is None: |
|
caption_processor = "microsoft/Florence-2-base" |
|
caption_model = "microsoft/Florence-2-base" |
|
|
|
super().__init__(path_images, caption_processor, caption_model) |
|
self.caption_processor = caption_processor |
|
self.caption_model = caption_model |
|
self.processor = None |
|
self.model = None |
|
self.caption_gpu = caption_gpu |
|
self.context_class = NullContext |
|
self.load_in_8bit = load_in_8bit and have_bitsandbytes |
|
self.load_half = load_half |
|
self.load_gptq = load_gptq |
|
self.load_awq = load_awq |
|
self.load_exllama = load_exllama |
|
self.use_safetensors = use_safetensors |
|
self.revision = revision |
|
self.gpu_id = gpu_id |
|
|
|
self.prompt = "image of" |
|
self.min_new_tokens = min_new_tokens |
|
self.max_tokens = max_tokens |
|
|
|
self.device = 'cpu' |
|
self.device_map = {"": 'cpu'} |
|
self.set_context() |
|
|
|
def set_context(self): |
|
if get_device() == 'cuda' and self.caption_gpu: |
|
import torch |
|
n_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 0 |
|
if n_gpus > 0: |
|
self.context_class = torch.device |
|
self.device = 'cuda' |
|
else: |
|
self.device = 'cpu' |
|
else: |
|
self.device = 'cpu' |
|
if self.caption_gpu: |
|
if self.gpu_id == 'auto': |
|
|
|
|
|
self.device_map = {"": 0} |
|
else: |
|
if self.device == 'cuda': |
|
self.device_map = {"": 'cuda:%d' % self.gpu_id} |
|
else: |
|
self.device_map = {"": 'cpu'} |
|
else: |
|
self.device_map = {"": 'cpu'} |
|
|
|
def load_model(self): |
|
try: |
|
import transformers |
|
except ImportError: |
|
raise ValueError( |
|
"`transformers` package not found, please install with " |
|
"`pip install transformers`." |
|
) |
|
self.set_context() |
|
if self.model: |
|
if not self.load_in_8bit and str(self.model.device) != self.device_map['']: |
|
self.model.to(self.device) |
|
return self |
|
import torch |
|
with torch.no_grad(): |
|
with self.context_class(self.device): |
|
context_class_cast = NullContext if self.device == 'cpu' else torch.autocast |
|
with context_class_cast(self.device): |
|
if 'blip2' in self.caption_processor.lower(): |
|
from transformers import Blip2Processor, Blip2ForConditionalGeneration |
|
if self.load_half and not self.load_in_8bit: |
|
self.processor = Blip2Processor.from_pretrained(self.caption_processor, |
|
device_map=self.device_map).half() |
|
self.model = Blip2ForConditionalGeneration.from_pretrained(self.caption_model, |
|
device_map=self.device_map).half() |
|
else: |
|
self.processor = Blip2Processor.from_pretrained(self.caption_processor, |
|
load_in_8bit=self.load_in_8bit, |
|
device_map=self.device_map, |
|
) |
|
self.model = Blip2ForConditionalGeneration.from_pretrained(self.caption_model, |
|
load_in_8bit=self.load_in_8bit, |
|
device_map=self.device_map) |
|
elif 'blip' in self.caption_processor.lower(): |
|
from transformers import BlipForConditionalGeneration, BlipProcessor |
|
self.load_half = False |
|
self.processor = BlipProcessor.from_pretrained(self.caption_processor, device_map=self.device_map) |
|
self.model = BlipForConditionalGeneration.from_pretrained(self.caption_model, |
|
device_map=self.device_map) |
|
else: |
|
from transformers import AutoModelForCausalLM, AutoProcessor |
|
self.load_half = False |
|
self.processor = AutoProcessor.from_pretrained(self.caption_processor, device_map=self.device_map, |
|
trust_remote_code=True) |
|
self.model = AutoModelForCausalLM.from_pretrained(self.caption_model, device_map=self.device_map, |
|
trust_remote_code=True) |
|
return self |
|
|
|
def set_image_paths(self, path_images: Union[str, List[str]]): |
|
""" |
|
Load from a list of image files |
|
""" |
|
if isinstance(path_images, str): |
|
self.image_paths = [path_images] |
|
else: |
|
self.image_paths = path_images |
|
|
|
def load(self, prompt=None) -> List[Document]: |
|
if self.processor is None or self.model is None: |
|
self.load_model() |
|
results = [] |
|
for path_image in self.image_paths: |
|
caption, metadata = self._get_captions_and_metadata( |
|
model=self.model, processor=self.processor, path_image=path_image, |
|
prompt=prompt, |
|
) |
|
doc = Document(page_content=caption, metadata=metadata) |
|
results.append(doc) |
|
|
|
return results |
|
|
|
def unload_model(self): |
|
if hasattr(self, 'model') and hasattr(self.model, 'cpu'): |
|
self.model.cpu() |
|
clear_torch_cache() |
|
|
|
def _get_captions_and_metadata( |
|
self, model: Any, processor: Any, path_image: str, |
|
prompt=None) -> Tuple[str, dict]: |
|
""" |
|
Helper function for getting the captions and metadata of an image |
|
""" |
|
if prompt is None: |
|
prompt = self.prompt |
|
try: |
|
from PIL import Image |
|
except ImportError: |
|
raise ValueError( |
|
"`PIL` package not found, please install with `pip install pillow`" |
|
) |
|
|
|
try: |
|
if path_image.startswith("http://") or path_image.startswith("https://"): |
|
image = Image.open(requests.get(path_image, stream=True).raw).convert( |
|
"RGB" |
|
) |
|
else: |
|
image = Image.open(path_image).convert("RGB") |
|
except Exception: |
|
raise ValueError(f"Could not get image data for {path_image}") |
|
|
|
import torch |
|
with torch.no_grad(): |
|
with self.context_class(self.device): |
|
context_class_cast = NullContext if self.device == 'cpu' else torch.autocast |
|
with context_class_cast(self.device): |
|
extra_kwargs = {} |
|
|
|
if isinstance(self.caption_model, str) and 'florence' in self.caption_model.lower(): |
|
caption_detail_task_map = { |
|
"low": "<CAPTION>", |
|
"medium": "<DETAILED_CAPTION>", |
|
"high": "<MORE_DETAILED_CAPTION>", |
|
} |
|
task_prompt = caption_detail_task_map[ |
|
'high' if 'large' in self.caption_model else 'medium' |
|
] |
|
num_beams = 3 if 'large' in self.caption_model else 1 |
|
extra_kwargs.update(dict(num_beams=num_beams)) |
|
if prompt and False: |
|
prompt = task_prompt + prompt |
|
else: |
|
prompt = task_prompt |
|
|
|
if isinstance(self.caption_model, str) and 'blip' in self.caption_model: |
|
min_length = len(prompt) // 4 + self.min_new_tokens |
|
self.max_tokens = max(self.max_tokens, min_length) |
|
extra_kwargs.update(dict(min_length=min_length)) |
|
if self.load_half: |
|
|
|
inputs = processor(image, prompt, return_tensors="pt") |
|
else: |
|
inputs = processor(image, prompt, return_tensors="pt") |
|
else: |
|
inputs = processor(text=prompt, images=image, return_tensors="pt") |
|
inputs.to(model.device) |
|
output = model.generate(**inputs, max_length=self.max_tokens, **extra_kwargs) |
|
|
|
caption: str = processor.decode(output[0], skip_special_tokens=True) |
|
if isinstance(self.caption_model, str) and 'blip' in self.caption_model: |
|
prompti = caption.find(prompt) |
|
if prompti >= 0: |
|
caption = caption[prompti + len(prompt):] |
|
elif isinstance(self.caption_model, str) and 'florence' in self.caption_model.lower(): |
|
parsed_answer = processor.post_process_generation( |
|
caption, task=task_prompt, image_size=(image.width, image.height) |
|
) |
|
caption: str = parsed_answer[task_prompt].strip() |
|
|
|
metadata: dict = {"image_path": path_image} |
|
|
|
return caption, metadata |
|
|