import timm import torch from PIL import Image from timm.utils import ParseKwargs from timm.data.constants import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD, DEFAULT_CROP_PCT # from transformers.utils import logging # logging.set_verbosity_info() # logger = logging.get_logger("transformers") ### import os import time from contextlib import suppress from functools import partial import numpy as np import pandas as pd import torch from timm.data import create_dataset, create_loader, resolve_data_config, ImageNetInfo, infer_imagenet_subset from timm.layers import apply_test_time_pool from timm.models import create_model from timm.utils import AverageMeter, setup_default_logging, set_jit_fuser, ParseKwargs try: from apex import amp has_apex = True except ImportError: has_apex = False has_native_amp = False try: if getattr(torch.cuda.amp, 'autocast') is not None: has_native_amp = True except AttributeError: pass # try: # from functorch.compile import memory_efficient_fusion # has_functorch = True # except ImportError as e: # has_functorch = False has_compile = hasattr(torch, 'compile') import PIL import requests import io import base64 # ImageFile.LOAD_TRUNCATED_IMAGES = True ### def transform_percentage(old_value): threshold = 0.7 desired_range_end = 1 desired_range_start = 0 k = 20 # You can adjust this parameter new_value = (desired_range_end - desired_range_start) * (1 / (1 + np.exp(-k * (old_value - threshold)))) + desired_range_start return new_value class EndpointHandler(): def __init__(self, path=""): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if torch.cuda.is_available(): torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.benchmark = True # May sacrifice a bit of accuracy, depending on our needs assert has_native_amp, 'Please update PyTorch to a version with native AMP (or use APEX).' amp_dtype = torch.float16 amp_autocast = partial(torch.autocast, device_type=self.device.type, dtype=amp_dtype) # data_config = resolve_data_config(vars(args), model=model) self.aiGeneratorModel = timm.create_model('eva02_base_patch14_448.mim_in22k_ft_in22k_in1k', num_classes=4, in_chans=3, checkpoint_path=path + '/model_best_v6.pth-52b1d7bb.pth') # self.aiArtModel = timm.create_model('eva02_base_patch14_448.mim_in22k_ft_in22k_in1k', num_classes=3, in_chans=3, checkpoint_path=path + '/AIArtDetector.pth-af59f7fa.pth') self.aiArtModel = timm.create_model('eva02_large_patch14_448.mim_m38m_ft_in22k_in1k', num_classes=2, in_chans=3, checkpoint_path=path + '/modelv3.pth') self.aiGeneratorModel = self.aiGeneratorModel.to(self.device) self.aiArtModel = self.aiArtModel.to(self.device) self.aiGeneratorModel.eval() self.aiArtModel.eval() self.transform = timm.data.create_transform(input_size=(3, 448, 448), is_training=False, use_prefetcher=False, no_aug=False, scale=None, ratio=None, hflip=0, vflip=0., color_jitter=0, auto_augment=None, interpolation='bicubic', # mean=(0.5, 0.5, 0.5), # std=(0.5, 0.5, 0.5), re_prob=0., re_mode='const', re_count=1, re_num_splits=0, crop_pct=1.0, # crop_mode='center', crop_mode='squash', tf_preprocessing=False, separate=False) # assert has_compile, 'A version of torch w/ torch.compile() is required for --compile, possibly a nightly.' # torch._dynamo.reset() # model = torch.compile(model, backend=args.torchcompile) self.supported_formats = ["JPEG", "PNG", "BMP", "TIFF", "WEBP", "RAW"] #GIF requires its own special implementation to get its frames self.label_map = {0: 'Dall-E 2', 1: 'Dall-E 3', 2: 'Midjourney', 3: 'Stable Diffusion'} # self.ai_label_map = {0: "AI", 1: "Non-AI", 2: "Null"} self.ai_label_map = {0: "AI", 1: "Non-AI"} print("initialized handler.py successfully") def __call__(self, data): """ data args: inputs: Dict[str, Any] Return: A :obj:`list` | `dict`: will be serialized and returned """ inputs = data.pop("inputs") if not isinstance(inputs, list): inputs = [inputs] if len(inputs) > 10: return {'error': 'Exceeds max limit of images (50)'} print(inputs) image_paths = inputs #['https://google_image.png', ''] batch_size = 1 # Set your desired batch size, currently a small part of the code I wrote doesn't support batch_size > 1. results = {} for i in range(0, len(image_paths), batch_size): # For each batch batch_paths = image_paths[i:i+batch_size] validUrls = [] batch_images = [] # time1 = time.time() for j, src in enumerate(batch_paths): # Get all valid images open and inputted in batch_images try: if isinstance(src, str): # Image.open(batch_paths[j]).load() # Tests if image is okay to run inference on. pos = src.find("base64") if pos != -1: # Assuming base64_str is the string value without 'data:image/jpeg;base64,' new = Image.open(io.BytesIO(base64.decodebytes(bytes(src[pos+7:], "utf-8")))).convert("RGB") new.load() width, height = new.size if (width < 250 or height < 250): results[src] = {'error': 'Image quality not high enough'} raise Exception('Image quality not high enough') # new.load() Necessary? Does this catch any edge cases? Without this, we don't actually load the image pixels. batch_images.append(new) validUrls.append(src) else: try: # r = requests.get(src, stream=True) # r.raw.decode_content = True # new = Image.open(r.raw).convert("RGB") # new = Image.open(urlopen(src)) headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' } # t0 = time.time() r = requests.get(src, headers=headers) new = Image.open(io.BytesIO(r.content)).convert("RGB") new.load() width, height = new.size if (width < 250 or height < 250): results[src] = {'error': 'Image quality not high enough'} raise Exception('Image quality not high enough') # new.load() # print('none base 64 image') # print(t1 - t0) batch_images.append(new) validUrls.append(src) except Exception as e: if e.args == ('Image quality not high enough',): results[src] = {'error': 'Image quality not high enough'} else: results[src] = {'error': 'Failed to process image'} print(e) # invalid_indices.append(j) continue # batch_images.append(batch_paths[j]) elif isinstance(src, PIL.Image.Image): # Handle PIL Image objects new = src.convert("RGB") new.load() width, height = new.size if width < 250 or height < 250: print("OKERROR") results[str(j)] = {'error': 'Image quality not high enough'} print("ERROR AFTER") raise Exception('Image quality not high enough') batch_images.append(new) validUrls.append(str(i)+str(j)) except Exception as e: if e.args == ('Image quality not high enough',): results[src] = {'error': 'Image quality not high enough'} else: results[src] = {'error': 'Failed to process image w/ base64 in url'} print(e) continue # logger.INFO(str(time2-time1)) # tensor = self.transform(img).to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) for img in batch_images unsqueeze 0 print(batch_images) thing = [self.transform(img).to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) for img in batch_images] # time2 = time.time() if len(thing) > 0: batch_tensors = torch.stack(thing) is_AI = [] with torch.no_grad(): aiOutput = self.aiArtModel(batch_tensors) for k, tensor in enumerate(aiOutput): output = tensor.softmax(-1) # output, indice = output.topk(3) output, indice = output.topk(2) labels = [self.ai_label_map[x] for x in indice.cpu().numpy().tolist()] #numpy()[0] probabilities = [round(i * 100, 2) for i in output.cpu().numpy().tolist()] # numpy()[0] ai_prob = probabilities[labels.index("AI")] ai_prob = round(transform_percentage(ai_prob/100) * 100, 2) if ai_prob > 50: is_AI.append(k) #this arch only works for non-batching output1 = self.aiGeneratorModel(batch_tensors) for k, tensor in enumerate(output1): output = tensor.softmax(-1) output, indice = output.topk(4) labels = [self.label_map[x] for x in indice.cpu().numpy().tolist()] probabilities = [round(i * 100, 2) for i in output.cpu().numpy().tolist()] # time3 = time.time() single_res = {"ai_probability": ai_prob, 'probabilities': probabilities, 'indices': labels} results[validUrls[k]] = single_res else: # time3 = time.time() res = {"ai_probability": ai_prob} results[validUrls[k]] = res return results # handler = EndpointHandler() # handler.__call__({'inputs': ['https://www.ef.edu/', 'foewfoewjf','','https://images.nightcafe.studio//assets/tdraw-girl.jpg?tr=w-1200,c-at_max','https://www.searchenginejournal.com/wp-content/uploads/2022/06/image-search-1600-x-840-px-62c6dc4ff1eee-sej.png', 'https://www.searchenginejournal.com/wp-content/uploads/2022/06/image-search-1600-x-840-px-62c6dc4ff1eee-sej.png', 'https://www.searchenginejournal.com/wp-content/uploads/2022/06/image-search-1600-x-840-px-62c6dc4ff1eee-sej.png', 'https://www.searchenginejournal.com/wp-content/uploads/2022/06/image-search-1600-x-840-px-62c6dc4ff1eee-sej.png']})