import torch import open_clip from PIL import Image import requests import json import gradio as gr import pandas as pd from io import BytesIO import os # Load the Amazon taxonomy from a JSON file with open("amazon.json", "r") as f: AMAZON_TAXONOMY = json.load(f) base_model_name = "ViT-B-16" model_base, _, preprocess_base = open_clip.create_model_and_transforms(base_model_name) tokenizer_base = open_clip.get_tokenizer(base_model_name) model_name_B = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" model_B, _, preprocess_B = open_clip.create_model_and_transforms(model_name_B) tokenizer_B = open_clip.get_tokenizer(model_name_B) model_name_L = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" model_L, _, preprocess_L = open_clip.create_model_and_transforms(model_name_L) tokenizer_L = open_clip.get_tokenizer(model_name_L) models = [base_model_name, model_name_B, model_name_L] taxonomy_cache = {} for model in models: with open(f'{model.split("/")[-1]}.json', "r") as f: taxonomy_cache[model] = json.load(f) def cosine_similarity(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor: numerator = (a * b).sum(dim=-1) denominator = torch.linalg.norm(a, ord=2, dim=-1) * torch.linalg.norm( b, ord=2, dim=-1 ) return 0.5 * (numerator / denominator + 1.0) class BeamPath: def __init__(self, path: list, cumulative_score: float, current_layer: dict | list): self.path = path self.cumulative_score = cumulative_score self.current_layer = current_layer def __repr__(self): return f"BeamPath(path={self.path}, cumulative_score={self.cumulative_score})" def _compute_similarities(classes: list, base_embedding: torch.Tensor, cache_key: str): text_features = torch.tensor( [taxonomy_cache[cache_key][class_name] for class_name in classes] ) similarities = cosine_similarity(base_embedding, text_features) return similarities.cpu().numpy() def map_taxonomy( base_image: Image.Image, taxonomy: dict, model, tokenizer, preprocess_val, cache_key, beam_width: int = 3, ) -> tuple[list[tuple[str, float]], float]: image_tensor = preprocess_val(base_image).unsqueeze(0) with torch.no_grad(), torch.cuda.amp.autocast(): base_embedding = model.encode_image(image_tensor, normalize=True) initial_path = BeamPath(path=[], cumulative_score=0.0, current_layer=taxonomy) beam = [initial_path] final_paths = [] is_first = True while beam: candidates = [] candidate_entries = [] for beam_path in beam: layer = beam_path.current_layer if isinstance(layer, dict): classes = list(layer.keys()) elif isinstance(layer, list): classes = layer if classes == []: final_paths.append(beam_path) continue else: final_paths.append(beam_path) continue # current_path_class_names = [class_name for class_name, _ in beam_path.path] for class_name in classes: candidate_string = class_name if isinstance(layer, dict): next_layer = layer[class_name] else: next_layer = None candidate_entries.append( (candidate_string, class_name, beam_path, next_layer) ) if not candidate_entries: break candidate_strings = [ candidate_string for candidate_string, _, _, _ in candidate_entries ] similarities = _compute_similarities( candidate_strings, base_embedding, cache_key ) for (candidate_string, class_name, beam_path, next_layer), similarity in zip( candidate_entries, similarities ): new_path = beam_path.path + [(class_name, float(similarity))] new_cumulative_score = beam_path.cumulative_score + similarity candidate = BeamPath( path=new_path, cumulative_score=new_cumulative_score, current_layer=next_layer, ) candidates.append(candidate) from collections import defaultdict by_parents = defaultdict(list) for candidate in candidates: by_parents[candidate.path[0][0]].append(candidate) beam = [] for parent in by_parents: children = by_parents[parent] children.sort( key=lambda x: x.cumulative_score / len(x.path) + x.path[-1][1], reverse=True, ) if is_first: beam.extend(children) else: beam.extend(children[:beam_width]) is_first = False all_paths = beam + final_paths if all_paths: all_paths.sort(key=lambda x: x.cumulative_score / len(x.path), reverse=True) best_path = all_paths[0] return best_path.path, float(best_path.cumulative_score) else: return [], 0.0 # Function to classify image and map taxonomy def classify_image( image_input: Image.Image | None, image_url: str | None, model_size: str, beam_width: int, ): if image_input is not None: image = image_input elif image_url: # Try to get image from URL try: response = requests.get(image_url) image = Image.open(BytesIO(response.content)).convert("RGB") except Exception as e: return pd.DataFrame({"Error": [str(e)]}) else: return pd.DataFrame( { "Error": [ "Please provide an image, an image URL, or select an example image" ] } ) # Select the model, tokenizer, and preprocess if model_size == "marqo-ecommerce-embeddings-L": key = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" model = model_L preprocess_val = preprocess_L tokenizer = tokenizer_L elif model_size == "marqo-ecommerce-embeddings-B": key = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" model = model_B preprocess_val = preprocess_B tokenizer = tokenizer_B elif model_size == "openai-ViT-B-16": key = "ViT-B-16" model = model_base preprocess_val = preprocess_base tokenizer = tokenizer_base else: return pd.DataFrame({"Error": ["Invalid model size"]}) path, cumulative_score = map_taxonomy( base_image=image, taxonomy=AMAZON_TAXONOMY, model=model, tokenizer=tokenizer, preprocess_val=preprocess_val, cache_key=key, beam_width=beam_width, ) output = [] for idx, (category, score) in enumerate(path): level = idx + 1 output.append({"Level": level, "Category": category, "Score": score}) df = pd.DataFrame(output) return df with gr.Blocks() as demo: gr.Markdown("# Image Classification with Taxonomy Mapping") gr.Markdown( "## How to use this app\n\nThis app compares Marqo's E-commerce embeddings to OpenAI's ViT-B-16 CLIP model for E-commerce taxonomy mapping. A beam search is used to find the correct classification in the taxonomy. The original OpenAI CLIP models perform very poorly on E-commerce data." ) gr.Markdown( "Upload an image, provide an image URL, or select an example image, select the model size, and get the taxonomy mapping. The taxonomy is based on the Amazon product taxonomy." ) with gr.Row(): with gr.Column(): image_input = gr.Image(type="pil", label="Upload Image", height=300) image_url_input = gr.Textbox( lines=1, placeholder="Image URL", label="Image URL" ) gr.Markdown("### Or select an example image:") # Get example images from 'images' folder example_images_folder = "images" example_image_paths = [ os.path.join(example_images_folder, img) for img in os.listdir(example_images_folder) ] gr.Examples( examples=[[img_path] for img_path in example_image_paths], inputs=image_input, label="Example Images", examples_per_page=100, ) with gr.Column(): model_size_input = gr.Radio( choices=[ "marqo-ecommerce-embeddings-L", "marqo-ecommerce-embeddings-B", "openai-ViT-B-16", ], label="Model", value="marqo-ecommerce-embeddings-L", ) beam_width_input = gr.Number( label="Beam Width", value=5, minimum=1, step=1 ) classify_button = gr.Button("Classify") output_table = gr.Dataframe(headers=["Level", "Category", "Score"]) classify_button.click( fn=classify_image, inputs=[image_input, image_url_input, model_size_input, beam_width_input], outputs=output_table, ) demo.launch()