import hashlib import requests import json import re import os from datetime import datetime from server import PromptServer import folder_paths from ..utils import get_dict_value, load_json_file, path_exists, save_json_file from ..utils_userdata import read_userdata_json, save_userdata_json, delete_userdata_file def _get_info_cache_file(data_type: str, file_hash: str): return f'info/{file_hash}.{data_type}.json' async def delete_model_info(file: str, model_type, del_info=True, del_metadata=True, del_civitai=True): """Delete the info json, and the civitai & metadata caches.""" file_path = get_folder_path(file, model_type) if file_path is None: return if del_info: try_info_path = f'{file_path}.rgthree-info.json' if os.path.isfile(try_info_path): os.remove(try_info_path) if del_civitai or del_metadata: file_hash = _get_sha256_hash(file_path) if del_civitai: json_file_path = _get_info_cache_file(file_hash, 'civitai') delete_userdata_file(json_file_path) if del_metadata: json_file_path = _get_info_cache_file(file_hash, 'metadata') delete_userdata_file(json_file_path) async def get_model_info(file: str, model_type, default=None, maybe_fetch_civitai=False, force_fetch_civitai=False, maybe_fetch_metadata=False, force_fetch_metadata=False, light=False): """Compiles a model info given a stored file next to the model, and/or metadata/civitai.""" file_path = get_folder_path(file, model_type) if file_path is None: return default info_data = {} should_save = False # Try to load a rgthree-info.json file next to the file. try_info_path = f'{file_path}.rgthree-info.json' if path_exists(try_info_path): info_data = load_json_file(try_info_path) if 'file' not in info_data: info_data['file'] = file should_save = True if 'path' not in info_data: info_data['path'] = file_path should_save = True # Check if we have an image next to the file and, if so, add it to the front of the images # (if it isn't already). img_next_to_file = None for ext in ['jpg', 'png', 'jpeg']: try_path = f'{os.path.splitext(file_path)[0]}.{ext}' if path_exists(try_path): img_next_to_file = try_path break if 'images' not in info_data: info_data['images'] = [] should_save = True if img_next_to_file: img_next_to_file_url = f'/rgthree/api/loras/img?file={file}' if len(info_data['images']) == 0 or info_data['images'][0]['url'] != img_next_to_file_url: info_data['images'].insert(0, {'url': img_next_to_file_url}) should_save = True # If we just want light data then bail now with just existing data, plus file, path and img if # next to the file. if light and not maybe_fetch_metadata and not force_fetch_metadata and not maybe_fetch_civitai and not force_fetch_civitai: return info_data if 'raw' not in info_data: info_data['raw'] = {} should_save = True should_save = _update_data(info_data) or should_save should_fetch_civitai = force_fetch_civitai is True or (maybe_fetch_civitai is True and 'civitai' not in info_data['raw']) should_fetch_metadata = force_fetch_metadata is True or (maybe_fetch_metadata is True and 'metadata' not in info_data['raw']) if should_fetch_metadata: data_meta = _get_model_metadata(file, model_type, default={}, refresh=force_fetch_metadata) should_save = _merge_metadata(info_data, data_meta) or should_save if should_fetch_civitai: data_civitai = _get_model_civitai_data(file, model_type, default={}, refresh=force_fetch_civitai) should_save = _merge_civitai_data(info_data, data_civitai) or should_save if 'sha256' not in info_data: file_hash = _get_sha256_hash(file_path) if file_hash is not None: info_data['sha256'] = file_hash should_save = True if should_save: if 'trainedWords' in info_data: # Sort by count; if it doesn't exist, then assume it's a top item from civitai or elsewhere. info_data['trainedWords'] = sorted(info_data['trainedWords'], key=lambda w: w['count'] if 'count' in w else 99999, reverse=True) save_model_info(file, info_data, model_type) # If we're saving, then the UI is likely waiting to see if the refreshed data is coming in. await PromptServer.instance.send("rgthree-refreshed-lora-info", {"data": info_data}) return info_data def _update_data(info_data: dict) -> bool: """Ports old data to new data if necessary.""" should_save = False # If we have "triggerWords" then move them over to "trainedWords" if 'triggerWords' in info_data and len(info_data['triggerWords']) > 0: civitai_words = ','.join((get_dict_value(info_data, 'raw.civitai.triggerWords', default=[]) + get_dict_value(info_data, 'raw.civitai.trainedWords', default=[]))) if 'trainedWords' not in info_data: info_data['trainedWords'] = [] for trigger_word in info_data['triggerWords']: word_data = next((data for data in info_data['trainedWords'] if data['word'] == trigger_word), None) if word_data is None: word_data = {'word': trigger_word} info_data['trainedWords'].append(word_data) if trigger_word in civitai_words: word_data['civitai'] = True else: word_data['user'] = True del info_data['triggerWords'] should_save = True return should_save def _merge_metadata(info_data: dict, data_meta: dict) -> bool: """Returns true if data was saved.""" should_save = False base_model_file = get_dict_value(data_meta, 'ss_sd_model_name', None) if base_model_file: info_data['baseModelFile'] = base_model_file # Loop over metadata tags trained_words = {} if 'ss_tag_frequency' in data_meta and isinstance(data_meta['ss_tag_frequency'], dict): for bucket_value in data_meta['ss_tag_frequency'].values(): if isinstance(bucket_value, dict): for tag, count in bucket_value.items(): if tag not in trained_words: trained_words[tag] = {'word': tag, 'count': 0, 'metadata': True} trained_words[tag]['count'] = trained_words[tag]['count'] + count if 'trainedWords' not in info_data: info_data['trainedWords'] = list(trained_words.values()) should_save = True else: # We can't merge, because the list may have other data, like it's part of civitaidata. merged_dict = {} for existing_word_data in info_data['trainedWords']: merged_dict[existing_word_data['word']] = existing_word_data for new_key, new_word_data in trained_words.items(): if new_key not in merged_dict: merged_dict[new_key] = {} merged_dict[new_key] = {**merged_dict[new_key], **new_word_data} info_data['trainedWords'] = list(merged_dict.values()) should_save = True # trained_words = list(trained_words.values()) # info_data['meta_trained_words'] = trained_words info_data['raw']['metadata'] = data_meta should_save = True if 'sha256' not in info_data and '_sha256' in data_meta: info_data['sha256'] = data_meta['_sha256'] should_save = True return should_save def _merge_civitai_data(info_data: dict, data_civitai: dict) -> bool: """Returns true if data was saved.""" should_save = False if 'name' not in info_data: info_data['name'] = get_dict_value(data_civitai, 'model.name', '') should_save = True version_name = get_dict_value(data_civitai, 'name') if version_name is not None: info_data['name'] += f' - {version_name}' if 'type' not in info_data: info_data['type'] = get_dict_value(data_civitai, 'model.type') should_save = True if 'baseModel' not in info_data: info_data['baseModel'] = get_dict_value(data_civitai, 'baseModel') should_save = True # We always want to merge triggerword. civitai_trigger = get_dict_value(data_civitai, 'triggerWords', default=[]) civitai_trained = get_dict_value(data_civitai, 'trainedWords', default=[]) civitai_words = ','.join(civitai_trigger + civitai_trained) if civitai_words: civitai_words = re.sub(r"\s*,\s*", ",", civitai_words) civitai_words = re.sub(r",+", ",", civitai_words) civitai_words = re.sub(r"^,", "", civitai_words) civitai_words = re.sub(r",$", "", civitai_words) if civitai_words: civitai_words = civitai_words.split(',') if 'trainedWords' not in info_data: info_data['trainedWords'] = [] for trigger_word in civitai_words: word_data = next( (data for data in info_data['trainedWords'] if data['word'] == trigger_word), None) if word_data is None: word_data = {'word': trigger_word} info_data['trainedWords'].append(word_data) word_data['civitai'] = True if 'sha256' not in info_data: info_data['sha256'] = data_civitai['_sha256'] should_save = True if 'modelId' in data_civitai: info_data['links'] = info_data['links'] if 'links' in info_data else [] civitai_link = f'https://civitai.com/models/{get_dict_value(data_civitai, "modelId")}' if get_dict_value(data_civitai, "id"): civitai_link += f'?modelVersionId={get_dict_value(data_civitai, "id")}' info_data['links'].append(civitai_link) info_data['links'].append(data_civitai['_civitai_api']) should_save = True # Take images from civitai if 'images' in data_civitai: info_data_image_urls = list(map(lambda i: i['url'] if 'url' in i else None, info_data['images'])) for img in data_civitai['images']: img_url = get_dict_value(img, 'url') if img_url is not None and img_url not in info_data_image_urls: img_id = os.path.splitext(os.path.basename(img_url))[0] if img_url is not None else None img_data = { 'url': img_url, 'civitaiUrl': f'https://civitai.com/images/{img_id}' if img_id is not None else None, 'width': get_dict_value(img, 'width'), 'height': get_dict_value(img, 'height'), 'type': get_dict_value(img, 'type'), 'nsfwLevel': get_dict_value(img, 'nsfwLevel'), 'seed': get_dict_value(img, 'meta.seed'), 'positive': get_dict_value(img, 'meta.prompt'), 'negative': get_dict_value(img, 'meta.negativePrompt'), 'steps': get_dict_value(img, 'meta.steps'), 'sampler': get_dict_value(img, 'meta.sampler'), 'cfg': get_dict_value(img, 'meta.cfgScale'), 'model': get_dict_value(img, 'meta.Model'), 'resources': get_dict_value(img, 'meta.resources'), } info_data['images'].append(img_data) should_save = True # The raw data if 'civitai' not in info_data['raw']: info_data['raw']['civitai'] = data_civitai should_save = True return should_save def _get_model_civitai_data(file: str, model_type, default=None, refresh=False): """Gets the civitai data, either cached from the user directory, or from civitai api.""" file_hash = _get_sha256_hash(get_folder_path(file, model_type)) if file_hash is None: return None json_file_path = _get_info_cache_file(file_hash, 'civitai') api_url = f'https://civitai.com/api/v1/model-versions/by-hash/{file_hash}' file_data = read_userdata_json(json_file_path) if file_data is None or refresh is True: try: response = requests.get(api_url, timeout=5000) data = response.json() save_userdata_json(json_file_path, { 'url': api_url, 'timestamp': datetime.now().timestamp(), 'response': data }) file_data = read_userdata_json(json_file_path) except requests.exceptions.RequestException as e: # This is the correct syntax print(e) response = file_data['response'] if file_data is not None and 'response' in file_data else None if response is not None: response['_sha256'] = file_hash response['_civitai_api'] = api_url return response if response is not None else default def _get_model_metadata(file: str, model_type, default=None, refresh=False): """Gets the metadata from the file itself.""" file_path = get_folder_path(file, model_type) file_hash = _get_sha256_hash(file_path) if file_hash is None: return default json_file_path = _get_info_cache_file(file_hash, 'metadata') file_data = read_userdata_json(json_file_path) if file_data is None or refresh is True: data = _read_file_metadata_from_header(file_path) if data is not None: file_data = {'url': file, 'timestamp': datetime.now().timestamp(), 'response': data} save_userdata_json(json_file_path, file_data) response = file_data['response'] if file_data is not None and 'response' in file_data else None if response is not None: response['_sha256'] = file_hash return response if response is not None else default def _read_file_metadata_from_header(file_path: str) -> dict: """Reads the file's header and returns a JSON dict metdata if available.""" data = None try: if file_path.endswith('.safetensors'): with open(file_path, "rb") as file: # https://github.com/huggingface/safetensors#format # 8 bytes: N, an unsigned little-endian 64-bit integer, containing the size of the header header_size = int.from_bytes(file.read(8), "little", signed=False) if header_size <= 0: raise BufferError("Invalid header size") header = file.read(header_size) if header is None: raise BufferError("Invalid header") header_json = json.loads(header) data = header_json["__metadata__"] if "__metadata__" in header_json else None if data is not None: for key, value in data.items(): if isinstance(value, str) and value.startswith('{') and value.endswith('}'): try: value_as_json = json.loads(value) data[key] = value_as_json except Exception: print(f'metdata for field {key} did not parse as json') except requests.exceptions.RequestException as e: print(e) data = None return data def get_folder_path(file: str, model_type): """Gets the file path ensuring it exists.""" file_path = folder_paths.get_full_path(model_type, file) if file_path and not path_exists(file_path): file_path = os.path.abspath(file_path) if not path_exists(file_path): file_path = None return file_path def _get_sha256_hash(file_path: str): """Returns the hash for the file.""" if not file_path or not path_exists(file_path): return None file_hash = None sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) file_hash = sha256_hash.hexdigest() return file_hash async def set_model_info_partial(file: str, model_type: str, info_data_partial): """Sets partial data into the existing model info data.""" info_data = await get_model_info(file, model_type, default={}) info_data = {**info_data, **info_data_partial} save_model_info(file, info_data, model_type) def save_model_info(file: str, info_data, model_type): """Saves the model info alongside the model itself.""" file_path = get_folder_path(file, model_type) if file_path is None: return try_info_path = f'{file_path}.rgthree-info.json' save_json_file(try_info_path, info_data)