OwenElliott's picture
Upload 18 files
b6c64a0 verified
raw
history blame
9.22 kB
import torch
import open_clip
from PIL import Image
import requests
import json
import gradio as gr
import pandas as pd
from io import BytesIO
import os
# Load the Amazon taxonomy from a JSON file
with open("amazon.json", "r") as f:
AMAZON_TAXONOMY = json.load(f)
base_model_name = "ViT-B-16"
model_base, _, preprocess_base = open_clip.create_model_and_transforms(base_model_name)
tokenizer_base = open_clip.get_tokenizer(base_model_name)
model_name_B = "hf-hub:Marqo/marqo-ecommerce-embeddings-B"
model_B, _, preprocess_B = open_clip.create_model_and_transforms(model_name_B)
tokenizer_B = open_clip.get_tokenizer(model_name_B)
model_name_L = "hf-hub:Marqo/marqo-ecommerce-embeddings-L"
model_L, _, preprocess_L = open_clip.create_model_and_transforms(model_name_L)
tokenizer_L = open_clip.get_tokenizer(model_name_L)
models = [base_model_name, model_name_B, model_name_L]
taxonomy_cache = {}
for model in models:
with open(f'{model.split("/")[-1]}.json', "r") as f:
taxonomy_cache[model] = json.load(f)
def cosine_similarity(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
numerator = (a * b).sum(dim=-1)
denominator = torch.linalg.norm(a, ord=2, dim=-1) * torch.linalg.norm(
b, ord=2, dim=-1
)
return 0.5 * (numerator / denominator + 1.0)
class BeamPath:
def __init__(self, path: list, cumulative_score: float, current_layer: dict | list):
self.path = path
self.cumulative_score = cumulative_score
self.current_layer = current_layer
def __repr__(self):
return f"BeamPath(path={self.path}, cumulative_score={self.cumulative_score})"
def _compute_similarities(classes: list, base_embedding: torch.Tensor, cache_key: str):
text_features = torch.tensor(
[taxonomy_cache[cache_key][class_name] for class_name in classes]
)
similarities = cosine_similarity(base_embedding, text_features)
return similarities.cpu().numpy()
def map_taxonomy(
base_image: Image.Image,
taxonomy: dict,
model,
tokenizer,
preprocess_val,
cache_key,
beam_width: int = 3,
) -> tuple[list[tuple[str, float]], float]:
image_tensor = preprocess_val(base_image).unsqueeze(0)
with torch.no_grad(), torch.cuda.amp.autocast():
base_embedding = model.encode_image(image_tensor, normalize=True)
initial_path = BeamPath(path=[], cumulative_score=0.0, current_layer=taxonomy)
beam = [initial_path]
final_paths = []
is_first = True
while beam:
candidates = []
candidate_entries = []
for beam_path in beam:
layer = beam_path.current_layer
if isinstance(layer, dict):
classes = list(layer.keys())
elif isinstance(layer, list):
classes = layer
if classes == []:
final_paths.append(beam_path)
continue
else:
final_paths.append(beam_path)
continue
# current_path_class_names = [class_name for class_name, _ in beam_path.path]
for class_name in classes:
candidate_string = class_name
if isinstance(layer, dict):
next_layer = layer[class_name]
else:
next_layer = None
candidate_entries.append(
(candidate_string, class_name, beam_path, next_layer)
)
if not candidate_entries:
break
candidate_strings = [
candidate_string for candidate_string, _, _, _ in candidate_entries
]
similarities = _compute_similarities(
candidate_strings, base_embedding, cache_key
)
for (candidate_string, class_name, beam_path, next_layer), similarity in zip(
candidate_entries, similarities
):
new_path = beam_path.path + [(class_name, float(similarity))]
new_cumulative_score = beam_path.cumulative_score + similarity
candidate = BeamPath(
path=new_path,
cumulative_score=new_cumulative_score,
current_layer=next_layer,
)
candidates.append(candidate)
from collections import defaultdict
by_parents = defaultdict(list)
for candidate in candidates:
by_parents[candidate.path[0][0]].append(candidate)
beam = []
for parent in by_parents:
children = by_parents[parent]
children.sort(
key=lambda x: x.cumulative_score / len(x.path) + x.path[-1][1],
reverse=True,
)
if is_first:
beam.extend(children)
else:
beam.extend(children[:beam_width])
is_first = False
all_paths = beam + final_paths
if all_paths:
all_paths.sort(key=lambda x: x.cumulative_score / len(x.path), reverse=True)
best_path = all_paths[0]
return best_path.path, float(best_path.cumulative_score)
else:
return [], 0.0
# Function to classify image and map taxonomy
def classify_image(
image_input: Image.Image | None,
image_url: str | None,
model_size: str,
beam_width: int,
):
if image_input is not None:
image = image_input
elif image_url:
# Try to get image from URL
try:
response = requests.get(image_url)
image = Image.open(BytesIO(response.content)).convert("RGB")
except Exception as e:
return pd.DataFrame({"Error": [str(e)]})
else:
return pd.DataFrame(
{
"Error": [
"Please provide an image, an image URL, or select an example image"
]
}
)
# Select the model, tokenizer, and preprocess
if model_size == "marqo-ecommerce-embeddings-L":
key = "hf-hub:Marqo/marqo-ecommerce-embeddings-L"
model = model_L
preprocess_val = preprocess_L
tokenizer = tokenizer_L
elif model_size == "marqo-ecommerce-embeddings-B":
key = "hf-hub:Marqo/marqo-ecommerce-embeddings-B"
model = model_B
preprocess_val = preprocess_B
tokenizer = tokenizer_B
elif model_size == "openai-ViT-B-16":
key = "ViT-B-16"
model = model_base
preprocess_val = preprocess_base
tokenizer = tokenizer_base
else:
return pd.DataFrame({"Error": ["Invalid model size"]})
path, cumulative_score = map_taxonomy(
base_image=image,
taxonomy=AMAZON_TAXONOMY,
model=model,
tokenizer=tokenizer,
preprocess_val=preprocess_val,
cache_key=key,
beam_width=beam_width,
)
output = []
for idx, (category, score) in enumerate(path):
level = idx + 1
output.append({"Level": level, "Category": category, "Score": score})
df = pd.DataFrame(output)
return df
with gr.Blocks() as demo:
gr.Markdown("# Image Classification with Taxonomy Mapping")
gr.Markdown(
"## How to use this app\n\nThis app compares Marqo's E-commerce embeddings to OpenAI's ViT-B-16 CLIP model for E-commerce taxonomy mapping. A beam search is used to find the correct classification in the taxonomy. The original OpenAI CLIP models perform very poorly on E-commerce data."
)
gr.Markdown(
"Upload an image, provide an image URL, or select an example image, select the model size, and get the taxonomy mapping. The taxonomy is based on the Amazon product taxonomy."
)
with gr.Row():
with gr.Column():
image_input = gr.Image(type="pil", label="Upload Image", height=300)
image_url_input = gr.Textbox(
lines=1, placeholder="Image URL", label="Image URL"
)
gr.Markdown("### Or select an example image:")
# Get example images from 'images' folder
example_images_folder = "images"
example_image_paths = [
os.path.join(example_images_folder, img)
for img in os.listdir(example_images_folder)
]
gr.Examples(
examples=[[img_path] for img_path in example_image_paths],
inputs=image_input,
label="Example Images",
examples_per_page=100,
)
with gr.Column():
model_size_input = gr.Radio(
choices=[
"marqo-ecommerce-embeddings-L",
"marqo-ecommerce-embeddings-B",
"openai-ViT-B-16",
],
label="Model",
value="marqo-ecommerce-embeddings-L",
)
beam_width_input = gr.Number(
label="Beam Width", value=5, minimum=1, step=1
)
classify_button = gr.Button("Classify")
output_table = gr.Dataframe(headers=["Level", "Category", "Score"])
classify_button.click(
fn=classify_image,
inputs=[image_input, image_url_input, model_size_input, beam_width_input],
outputs=output_table,
)
demo.launch()