|
import torch |
|
import open_clip |
|
from PIL import Image |
|
import requests |
|
import json |
|
import gradio as gr |
|
import pandas as pd |
|
from io import BytesIO |
|
import os |
|
|
|
|
|
with open("amazon.json", "r") as f: |
|
AMAZON_TAXONOMY = json.load(f) |
|
|
|
|
|
base_model_name = "ViT-B-16" |
|
model_base, _, preprocess_base = open_clip.create_model_and_transforms(base_model_name) |
|
tokenizer_base = open_clip.get_tokenizer(base_model_name) |
|
model_name_B = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" |
|
model_B, _, preprocess_B = open_clip.create_model_and_transforms(model_name_B) |
|
tokenizer_B = open_clip.get_tokenizer(model_name_B) |
|
model_name_L = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" |
|
model_L, _, preprocess_L = open_clip.create_model_and_transforms(model_name_L) |
|
tokenizer_L = open_clip.get_tokenizer(model_name_L) |
|
|
|
models = [base_model_name, model_name_B, model_name_L] |
|
|
|
taxonomy_cache = {} |
|
for model in models: |
|
with open(f'{model.split("/")[-1]}.json', "r") as f: |
|
taxonomy_cache[model] = json.load(f) |
|
|
|
|
|
def cosine_similarity(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor: |
|
numerator = (a * b).sum(dim=-1) |
|
denominator = torch.linalg.norm(a, ord=2, dim=-1) * torch.linalg.norm( |
|
b, ord=2, dim=-1 |
|
) |
|
return 0.5 * (numerator / denominator + 1.0) |
|
|
|
|
|
class BeamPath: |
|
def __init__(self, path: list, cumulative_score: float, current_layer: dict | list): |
|
self.path = path |
|
self.cumulative_score = cumulative_score |
|
self.current_layer = current_layer |
|
|
|
def __repr__(self): |
|
return f"BeamPath(path={self.path}, cumulative_score={self.cumulative_score})" |
|
|
|
|
|
def _compute_similarities(classes: list, base_embedding: torch.Tensor, cache_key: str): |
|
text_features = torch.tensor( |
|
[taxonomy_cache[cache_key][class_name] for class_name in classes] |
|
) |
|
|
|
similarities = cosine_similarity(base_embedding, text_features) |
|
return similarities.cpu().numpy() |
|
|
|
|
|
def map_taxonomy( |
|
base_image: Image.Image, |
|
taxonomy: dict, |
|
model, |
|
tokenizer, |
|
preprocess_val, |
|
cache_key, |
|
beam_width: int = 3, |
|
) -> tuple[list[tuple[str, float]], float]: |
|
image_tensor = preprocess_val(base_image).unsqueeze(0) |
|
with torch.no_grad(), torch.cuda.amp.autocast(): |
|
base_embedding = model.encode_image(image_tensor, normalize=True) |
|
|
|
initial_path = BeamPath(path=[], cumulative_score=0.0, current_layer=taxonomy) |
|
beam = [initial_path] |
|
|
|
final_paths = [] |
|
is_first = True |
|
while beam: |
|
candidates = [] |
|
candidate_entries = [] |
|
|
|
for beam_path in beam: |
|
layer = beam_path.current_layer |
|
|
|
if isinstance(layer, dict): |
|
classes = list(layer.keys()) |
|
elif isinstance(layer, list): |
|
classes = layer |
|
if classes == []: |
|
final_paths.append(beam_path) |
|
continue |
|
else: |
|
final_paths.append(beam_path) |
|
continue |
|
|
|
|
|
|
|
for class_name in classes: |
|
candidate_string = class_name |
|
if isinstance(layer, dict): |
|
next_layer = layer[class_name] |
|
else: |
|
next_layer = None |
|
candidate_entries.append( |
|
(candidate_string, class_name, beam_path, next_layer) |
|
) |
|
|
|
if not candidate_entries: |
|
break |
|
|
|
candidate_strings = [ |
|
candidate_string for candidate_string, _, _, _ in candidate_entries |
|
] |
|
|
|
similarities = _compute_similarities( |
|
candidate_strings, base_embedding, cache_key |
|
) |
|
|
|
for (candidate_string, class_name, beam_path, next_layer), similarity in zip( |
|
candidate_entries, similarities |
|
): |
|
new_path = beam_path.path + [(class_name, float(similarity))] |
|
new_cumulative_score = beam_path.cumulative_score + similarity |
|
candidate = BeamPath( |
|
path=new_path, |
|
cumulative_score=new_cumulative_score, |
|
current_layer=next_layer, |
|
) |
|
candidates.append(candidate) |
|
|
|
from collections import defaultdict |
|
|
|
by_parents = defaultdict(list) |
|
|
|
for candidate in candidates: |
|
by_parents[candidate.path[0][0]].append(candidate) |
|
|
|
beam = [] |
|
for parent in by_parents: |
|
children = by_parents[parent] |
|
children.sort( |
|
key=lambda x: x.cumulative_score / len(x.path) + x.path[-1][1], |
|
reverse=True, |
|
) |
|
if is_first: |
|
beam.extend(children) |
|
else: |
|
beam.extend(children[:beam_width]) |
|
|
|
is_first = False |
|
|
|
all_paths = beam + final_paths |
|
|
|
if all_paths: |
|
all_paths.sort(key=lambda x: x.cumulative_score / len(x.path), reverse=True) |
|
best_path = all_paths[0] |
|
return best_path.path, float(best_path.cumulative_score) |
|
else: |
|
return [], 0.0 |
|
|
|
|
|
|
|
def classify_image( |
|
image_input: Image.Image | None, |
|
image_url: str | None, |
|
model_size: str, |
|
beam_width: int, |
|
): |
|
if image_input is not None: |
|
image = image_input |
|
elif image_url: |
|
|
|
try: |
|
response = requests.get(image_url) |
|
image = Image.open(BytesIO(response.content)).convert("RGB") |
|
except Exception as e: |
|
return pd.DataFrame({"Error": [str(e)]}) |
|
else: |
|
return pd.DataFrame( |
|
{ |
|
"Error": [ |
|
"Please provide an image, an image URL, or select an example image" |
|
] |
|
} |
|
) |
|
|
|
|
|
if model_size == "marqo-ecommerce-embeddings-L": |
|
key = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" |
|
model = model_L |
|
preprocess_val = preprocess_L |
|
tokenizer = tokenizer_L |
|
elif model_size == "marqo-ecommerce-embeddings-B": |
|
key = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" |
|
model = model_B |
|
preprocess_val = preprocess_B |
|
tokenizer = tokenizer_B |
|
elif model_size == "openai-ViT-B-16": |
|
key = "ViT-B-16" |
|
model = model_base |
|
preprocess_val = preprocess_base |
|
tokenizer = tokenizer_base |
|
else: |
|
return pd.DataFrame({"Error": ["Invalid model size"]}) |
|
|
|
path, cumulative_score = map_taxonomy( |
|
base_image=image, |
|
taxonomy=AMAZON_TAXONOMY, |
|
model=model, |
|
tokenizer=tokenizer, |
|
preprocess_val=preprocess_val, |
|
cache_key=key, |
|
beam_width=beam_width, |
|
) |
|
|
|
output = [] |
|
for idx, (category, score) in enumerate(path): |
|
level = idx + 1 |
|
output.append({"Level": level, "Category": category, "Score": score}) |
|
|
|
df = pd.DataFrame(output) |
|
return df |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Image Classification with Taxonomy Mapping") |
|
gr.Markdown( |
|
"## How to use this app\n\nThis app compares Marqo's E-commerce embeddings to OpenAI's ViT-B-16 CLIP model for E-commerce taxonomy mapping. A beam search is used to find the correct classification in the taxonomy. The original OpenAI CLIP models perform very poorly on E-commerce data." |
|
) |
|
gr.Markdown( |
|
"Upload an image, provide an image URL, or select an example image, select the model size, and get the taxonomy mapping. The taxonomy is based on the Amazon product taxonomy." |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
image_input = gr.Image(type="pil", label="Upload Image", height=300) |
|
image_url_input = gr.Textbox( |
|
lines=1, placeholder="Image URL", label="Image URL" |
|
) |
|
gr.Markdown("### Or select an example image:") |
|
|
|
example_images_folder = "images" |
|
example_image_paths = [ |
|
os.path.join(example_images_folder, img) |
|
for img in os.listdir(example_images_folder) |
|
] |
|
gr.Examples( |
|
examples=[[img_path] for img_path in example_image_paths], |
|
inputs=image_input, |
|
label="Example Images", |
|
examples_per_page=100, |
|
) |
|
with gr.Column(): |
|
model_size_input = gr.Radio( |
|
choices=[ |
|
"marqo-ecommerce-embeddings-L", |
|
"marqo-ecommerce-embeddings-B", |
|
"openai-ViT-B-16", |
|
], |
|
label="Model", |
|
value="marqo-ecommerce-embeddings-L", |
|
) |
|
beam_width_input = gr.Number( |
|
label="Beam Width", value=5, minimum=1, step=1 |
|
) |
|
classify_button = gr.Button("Classify") |
|
output_table = gr.Dataframe(headers=["Level", "Category", "Score"]) |
|
|
|
classify_button.click( |
|
fn=classify_image, |
|
inputs=[image_input, image_url_input, model_size_input, beam_width_input], |
|
outputs=output_table, |
|
) |
|
|
|
demo.launch() |
|
|