Spaces:
Running
on
L40S
Running
on
L40S
import hashlib | |
import requests | |
import json | |
import re | |
import os | |
from datetime import datetime | |
from server import PromptServer | |
import folder_paths | |
from ..utils import get_dict_value, load_json_file, path_exists, save_json_file | |
from ..utils_userdata import read_userdata_json, save_userdata_json, delete_userdata_file | |
def _get_info_cache_file(data_type: str, file_hash: str): | |
return f'info/{file_hash}.{data_type}.json' | |
async def delete_model_info(file: str, | |
model_type, | |
del_info=True, | |
del_metadata=True, | |
del_civitai=True): | |
"""Delete the info json, and the civitai & metadata caches.""" | |
file_path = get_folder_path(file, model_type) | |
if file_path is None: | |
return | |
if del_info: | |
try_info_path = f'{file_path}.rgthree-info.json' | |
if os.path.isfile(try_info_path): | |
os.remove(try_info_path) | |
if del_civitai or del_metadata: | |
file_hash = _get_sha256_hash(file_path) | |
if del_civitai: | |
json_file_path = _get_info_cache_file(file_hash, 'civitai') | |
delete_userdata_file(json_file_path) | |
if del_metadata: | |
json_file_path = _get_info_cache_file(file_hash, 'metadata') | |
delete_userdata_file(json_file_path) | |
async def get_model_info(file: str, | |
model_type, | |
default=None, | |
maybe_fetch_civitai=False, | |
force_fetch_civitai=False, | |
maybe_fetch_metadata=False, | |
force_fetch_metadata=False, | |
light=False): | |
"""Compiles a model info given a stored file next to the model, and/or metadata/civitai.""" | |
file_path = get_folder_path(file, model_type) | |
if file_path is None: | |
return default | |
info_data = {} | |
should_save = False | |
# Try to load a rgthree-info.json file next to the file. | |
try_info_path = f'{file_path}.rgthree-info.json' | |
if path_exists(try_info_path): | |
info_data = load_json_file(try_info_path) | |
if 'file' not in info_data: | |
info_data['file'] = file | |
should_save = True | |
if 'path' not in info_data: | |
info_data['path'] = file_path | |
should_save = True | |
# Check if we have an image next to the file and, if so, add it to the front of the images | |
# (if it isn't already). | |
img_next_to_file = None | |
for ext in ['jpg', 'png', 'jpeg']: | |
try_path = f'{os.path.splitext(file_path)[0]}.{ext}' | |
if path_exists(try_path): | |
img_next_to_file = try_path | |
break | |
if 'images' not in info_data: | |
info_data['images'] = [] | |
should_save = True | |
if img_next_to_file: | |
img_next_to_file_url = f'/rgthree/api/loras/img?file={file}' | |
if len(info_data['images']) == 0 or info_data['images'][0]['url'] != img_next_to_file_url: | |
info_data['images'].insert(0, {'url': img_next_to_file_url}) | |
should_save = True | |
# If we just want light data then bail now with just existing data, plus file, path and img if | |
# next to the file. | |
if light and not maybe_fetch_metadata and not force_fetch_metadata and not maybe_fetch_civitai and not force_fetch_civitai: | |
return info_data | |
if 'raw' not in info_data: | |
info_data['raw'] = {} | |
should_save = True | |
should_save = _update_data(info_data) or should_save | |
should_fetch_civitai = force_fetch_civitai is True or (maybe_fetch_civitai is True and | |
'civitai' not in info_data['raw']) | |
should_fetch_metadata = force_fetch_metadata is True or (maybe_fetch_metadata is True and | |
'metadata' not in info_data['raw']) | |
if should_fetch_metadata: | |
data_meta = _get_model_metadata(file, model_type, default={}, refresh=force_fetch_metadata) | |
should_save = _merge_metadata(info_data, data_meta) or should_save | |
if should_fetch_civitai: | |
data_civitai = _get_model_civitai_data(file, | |
model_type, | |
default={}, | |
refresh=force_fetch_civitai) | |
should_save = _merge_civitai_data(info_data, data_civitai) or should_save | |
if 'sha256' not in info_data: | |
file_hash = _get_sha256_hash(file_path) | |
if file_hash is not None: | |
info_data['sha256'] = file_hash | |
should_save = True | |
if should_save: | |
if 'trainedWords' in info_data: | |
# Sort by count; if it doesn't exist, then assume it's a top item from civitai or elsewhere. | |
info_data['trainedWords'] = sorted(info_data['trainedWords'], | |
key=lambda w: w['count'] if 'count' in w else 99999, | |
reverse=True) | |
save_model_info(file, info_data, model_type) | |
# If we're saving, then the UI is likely waiting to see if the refreshed data is coming in. | |
await PromptServer.instance.send("rgthree-refreshed-lora-info", {"data": info_data}) | |
return info_data | |
def _update_data(info_data: dict) -> bool: | |
"""Ports old data to new data if necessary.""" | |
should_save = False | |
# If we have "triggerWords" then move them over to "trainedWords" | |
if 'triggerWords' in info_data and len(info_data['triggerWords']) > 0: | |
civitai_words = ','.join((get_dict_value(info_data, 'raw.civitai.triggerWords', default=[]) + | |
get_dict_value(info_data, 'raw.civitai.trainedWords', default=[]))) | |
if 'trainedWords' not in info_data: | |
info_data['trainedWords'] = [] | |
for trigger_word in info_data['triggerWords']: | |
word_data = next((data for data in info_data['trainedWords'] if data['word'] == trigger_word), | |
None) | |
if word_data is None: | |
word_data = {'word': trigger_word} | |
info_data['trainedWords'].append(word_data) | |
if trigger_word in civitai_words: | |
word_data['civitai'] = True | |
else: | |
word_data['user'] = True | |
del info_data['triggerWords'] | |
should_save = True | |
return should_save | |
def _merge_metadata(info_data: dict, data_meta: dict) -> bool: | |
"""Returns true if data was saved.""" | |
should_save = False | |
base_model_file = get_dict_value(data_meta, 'ss_sd_model_name', None) | |
if base_model_file: | |
info_data['baseModelFile'] = base_model_file | |
# Loop over metadata tags | |
trained_words = {} | |
if 'ss_tag_frequency' in data_meta and isinstance(data_meta['ss_tag_frequency'], dict): | |
for bucket_value in data_meta['ss_tag_frequency'].values(): | |
if isinstance(bucket_value, dict): | |
for tag, count in bucket_value.items(): | |
if tag not in trained_words: | |
trained_words[tag] = {'word': tag, 'count': 0, 'metadata': True} | |
trained_words[tag]['count'] = trained_words[tag]['count'] + count | |
if 'trainedWords' not in info_data: | |
info_data['trainedWords'] = list(trained_words.values()) | |
should_save = True | |
else: | |
# We can't merge, because the list may have other data, like it's part of civitaidata. | |
merged_dict = {} | |
for existing_word_data in info_data['trainedWords']: | |
merged_dict[existing_word_data['word']] = existing_word_data | |
for new_key, new_word_data in trained_words.items(): | |
if new_key not in merged_dict: | |
merged_dict[new_key] = {} | |
merged_dict[new_key] = {**merged_dict[new_key], **new_word_data} | |
info_data['trainedWords'] = list(merged_dict.values()) | |
should_save = True | |
# trained_words = list(trained_words.values()) | |
# info_data['meta_trained_words'] = trained_words | |
info_data['raw']['metadata'] = data_meta | |
should_save = True | |
if 'sha256' not in info_data and '_sha256' in data_meta: | |
info_data['sha256'] = data_meta['_sha256'] | |
should_save = True | |
return should_save | |
def _merge_civitai_data(info_data: dict, data_civitai: dict) -> bool: | |
"""Returns true if data was saved.""" | |
should_save = False | |
if 'name' not in info_data: | |
info_data['name'] = get_dict_value(data_civitai, 'model.name', '') | |
should_save = True | |
version_name = get_dict_value(data_civitai, 'name') | |
if version_name is not None: | |
info_data['name'] += f' - {version_name}' | |
if 'type' not in info_data: | |
info_data['type'] = get_dict_value(data_civitai, 'model.type') | |
should_save = True | |
if 'baseModel' not in info_data: | |
info_data['baseModel'] = get_dict_value(data_civitai, 'baseModel') | |
should_save = True | |
# We always want to merge triggerword. | |
civitai_trigger = get_dict_value(data_civitai, 'triggerWords', default=[]) | |
civitai_trained = get_dict_value(data_civitai, 'trainedWords', default=[]) | |
civitai_words = ','.join(civitai_trigger + civitai_trained) | |
if civitai_words: | |
civitai_words = re.sub(r"\s*,\s*", ",", civitai_words) | |
civitai_words = re.sub(r",+", ",", civitai_words) | |
civitai_words = re.sub(r"^,", "", civitai_words) | |
civitai_words = re.sub(r",$", "", civitai_words) | |
if civitai_words: | |
civitai_words = civitai_words.split(',') | |
if 'trainedWords' not in info_data: | |
info_data['trainedWords'] = [] | |
for trigger_word in civitai_words: | |
word_data = next( | |
(data for data in info_data['trainedWords'] if data['word'] == trigger_word), None) | |
if word_data is None: | |
word_data = {'word': trigger_word} | |
info_data['trainedWords'].append(word_data) | |
word_data['civitai'] = True | |
if 'sha256' not in info_data: | |
info_data['sha256'] = data_civitai['_sha256'] | |
should_save = True | |
if 'modelId' in data_civitai: | |
info_data['links'] = info_data['links'] if 'links' in info_data else [] | |
civitai_link = f'https://civitai.com/models/{get_dict_value(data_civitai, "modelId")}' | |
if get_dict_value(data_civitai, "id"): | |
civitai_link += f'?modelVersionId={get_dict_value(data_civitai, "id")}' | |
info_data['links'].append(civitai_link) | |
info_data['links'].append(data_civitai['_civitai_api']) | |
should_save = True | |
# Take images from civitai | |
if 'images' in data_civitai: | |
info_data_image_urls = list(map(lambda i: i['url'] | |
if 'url' in i else None, info_data['images'])) | |
for img in data_civitai['images']: | |
img_url = get_dict_value(img, 'url') | |
if img_url is not None and img_url not in info_data_image_urls: | |
img_id = os.path.splitext(os.path.basename(img_url))[0] if img_url is not None else None | |
img_data = { | |
'url': img_url, | |
'civitaiUrl': f'https://civitai.com/images/{img_id}' if img_id is not None else None, | |
'width': get_dict_value(img, 'width'), | |
'height': get_dict_value(img, 'height'), | |
'type': get_dict_value(img, 'type'), | |
'nsfwLevel': get_dict_value(img, 'nsfwLevel'), | |
'seed': get_dict_value(img, 'meta.seed'), | |
'positive': get_dict_value(img, 'meta.prompt'), | |
'negative': get_dict_value(img, 'meta.negativePrompt'), | |
'steps': get_dict_value(img, 'meta.steps'), | |
'sampler': get_dict_value(img, 'meta.sampler'), | |
'cfg': get_dict_value(img, 'meta.cfgScale'), | |
'model': get_dict_value(img, 'meta.Model'), | |
'resources': get_dict_value(img, 'meta.resources'), | |
} | |
info_data['images'].append(img_data) | |
should_save = True | |
# The raw data | |
if 'civitai' not in info_data['raw']: | |
info_data['raw']['civitai'] = data_civitai | |
should_save = True | |
return should_save | |
def _get_model_civitai_data(file: str, model_type, default=None, refresh=False): | |
"""Gets the civitai data, either cached from the user directory, or from civitai api.""" | |
file_hash = _get_sha256_hash(get_folder_path(file, model_type)) | |
if file_hash is None: | |
return None | |
json_file_path = _get_info_cache_file(file_hash, 'civitai') | |
api_url = f'https://civitai.com/api/v1/model-versions/by-hash/{file_hash}' | |
file_data = read_userdata_json(json_file_path) | |
if file_data is None or refresh is True: | |
try: | |
response = requests.get(api_url, timeout=5000) | |
data = response.json() | |
save_userdata_json(json_file_path, { | |
'url': api_url, | |
'timestamp': datetime.now().timestamp(), | |
'response': data | |
}) | |
file_data = read_userdata_json(json_file_path) | |
except requests.exceptions.RequestException as e: # This is the correct syntax | |
print(e) | |
response = file_data['response'] if file_data is not None and 'response' in file_data else None | |
if response is not None: | |
response['_sha256'] = file_hash | |
response['_civitai_api'] = api_url | |
return response if response is not None else default | |
def _get_model_metadata(file: str, model_type, default=None, refresh=False): | |
"""Gets the metadata from the file itself.""" | |
file_path = get_folder_path(file, model_type) | |
file_hash = _get_sha256_hash(file_path) | |
if file_hash is None: | |
return default | |
json_file_path = _get_info_cache_file(file_hash, 'metadata') | |
file_data = read_userdata_json(json_file_path) | |
if file_data is None or refresh is True: | |
data = _read_file_metadata_from_header(file_path) | |
if data is not None: | |
file_data = {'url': file, 'timestamp': datetime.now().timestamp(), 'response': data} | |
save_userdata_json(json_file_path, file_data) | |
response = file_data['response'] if file_data is not None and 'response' in file_data else None | |
if response is not None: | |
response['_sha256'] = file_hash | |
return response if response is not None else default | |
def _read_file_metadata_from_header(file_path: str) -> dict: | |
"""Reads the file's header and returns a JSON dict metdata if available.""" | |
data = None | |
try: | |
if file_path.endswith('.safetensors'): | |
with open(file_path, "rb") as file: | |
# https://github.com/huggingface/safetensors#format | |
# 8 bytes: N, an unsigned little-endian 64-bit integer, containing the size of the header | |
header_size = int.from_bytes(file.read(8), "little", signed=False) | |
if header_size <= 0: | |
raise BufferError("Invalid header size") | |
header = file.read(header_size) | |
if header is None: | |
raise BufferError("Invalid header") | |
header_json = json.loads(header) | |
data = header_json["__metadata__"] if "__metadata__" in header_json else None | |
if data is not None: | |
for key, value in data.items(): | |
if isinstance(value, str) and value.startswith('{') and value.endswith('}'): | |
try: | |
value_as_json = json.loads(value) | |
data[key] = value_as_json | |
except Exception: | |
print(f'metdata for field {key} did not parse as json') | |
except requests.exceptions.RequestException as e: | |
print(e) | |
data = None | |
return data | |
def get_folder_path(file: str, model_type): | |
"""Gets the file path ensuring it exists.""" | |
file_path = folder_paths.get_full_path(model_type, file) | |
if file_path and not path_exists(file_path): | |
file_path = os.path.abspath(file_path) | |
if not path_exists(file_path): | |
file_path = None | |
return file_path | |
def _get_sha256_hash(file_path: str): | |
"""Returns the hash for the file.""" | |
if not file_path or not path_exists(file_path): | |
return None | |
file_hash = None | |
sha256_hash = hashlib.sha256() | |
with open(file_path, "rb") as f: | |
# Read and update hash string value in blocks of 4K | |
for byte_block in iter(lambda: f.read(4096), b""): | |
sha256_hash.update(byte_block) | |
file_hash = sha256_hash.hexdigest() | |
return file_hash | |
async def set_model_info_partial(file: str, model_type: str, info_data_partial): | |
"""Sets partial data into the existing model info data.""" | |
info_data = await get_model_info(file, model_type, default={}) | |
info_data = {**info_data, **info_data_partial} | |
save_model_info(file, info_data, model_type) | |
def save_model_info(file: str, info_data, model_type): | |
"""Saves the model info alongside the model itself.""" | |
file_path = get_folder_path(file, model_type) | |
if file_path is None: | |
return | |
try_info_path = f'{file_path}.rgthree-info.json' | |
save_json_file(try_info_path, info_data) | |