File size: 16,461 Bytes
4450790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import hashlib
import requests
import json
import re
import os
from datetime import datetime

from server import PromptServer
import folder_paths

from ..utils import get_dict_value, load_json_file, path_exists, save_json_file
from ..utils_userdata import read_userdata_json, save_userdata_json, delete_userdata_file


def _get_info_cache_file(data_type: str, file_hash: str):
  return f'info/{file_hash}.{data_type}.json'


async def delete_model_info(file: str,

                            model_type,

                            del_info=True,

                            del_metadata=True,

                            del_civitai=True):
  """Delete the info json, and the civitai & metadata caches."""
  file_path = get_folder_path(file, model_type)
  if file_path is None:
    return
  if del_info:
    try_info_path = f'{file_path}.rgthree-info.json'
    if os.path.isfile(try_info_path):
      os.remove(try_info_path)
  if del_civitai or del_metadata:
    file_hash = _get_sha256_hash(file_path)
    if del_civitai:
      json_file_path = _get_info_cache_file(file_hash, 'civitai')
      delete_userdata_file(json_file_path)
    if del_metadata:
      json_file_path = _get_info_cache_file(file_hash, 'metadata')
      delete_userdata_file(json_file_path)


async def get_model_info(file: str,

                         model_type,

                         default=None,

                         maybe_fetch_civitai=False,

                         force_fetch_civitai=False,

                         maybe_fetch_metadata=False,

                         force_fetch_metadata=False,

                         light=False):
  """Compiles a model info given a stored file next to the model, and/or metadata/civitai."""

  file_path = get_folder_path(file, model_type)
  if file_path is None:
    return default

  info_data = {}
  should_save = False
  # Try to load a rgthree-info.json file next to the file.
  try_info_path = f'{file_path}.rgthree-info.json'
  if path_exists(try_info_path):
    info_data = load_json_file(try_info_path)

  if 'file' not in info_data:
    info_data['file'] = file
    should_save = True
  if 'path' not in info_data:
    info_data['path'] = file_path
    should_save = True

  # Check if we have an image next to the file and, if so, add it to the front of the images
  # (if it isn't already).
  img_next_to_file = None
  for ext in ['jpg', 'png', 'jpeg']:
    try_path = f'{os.path.splitext(file_path)[0]}.{ext}'
    if path_exists(try_path):
      img_next_to_file = try_path
      break

  if 'images' not in info_data:
    info_data['images'] = []
    should_save = True

  if img_next_to_file:
    img_next_to_file_url = f'/rgthree/api/loras/img?file={file}'
    if len(info_data['images']) == 0 or info_data['images'][0]['url'] != img_next_to_file_url:
      info_data['images'].insert(0, {'url': img_next_to_file_url})
      should_save = True

  # If we just want light data then bail now with just existing data, plus file, path and img if
  # next to the file.
  if light and not maybe_fetch_metadata and not force_fetch_metadata and not maybe_fetch_civitai and not force_fetch_civitai:
    return info_data

  if 'raw' not in info_data:
    info_data['raw'] = {}
    should_save = True

  should_save = _update_data(info_data) or should_save

  should_fetch_civitai = force_fetch_civitai is True or (maybe_fetch_civitai is True and
                                                         'civitai' not in info_data['raw'])
  should_fetch_metadata = force_fetch_metadata is True or (maybe_fetch_metadata is True and
                                                           'metadata' not in info_data['raw'])

  if should_fetch_metadata:
    data_meta = _get_model_metadata(file, model_type, default={}, refresh=force_fetch_metadata)
    should_save = _merge_metadata(info_data, data_meta) or should_save

  if should_fetch_civitai:
    data_civitai = _get_model_civitai_data(file,
                                           model_type,
                                           default={},
                                           refresh=force_fetch_civitai)
    should_save = _merge_civitai_data(info_data, data_civitai) or should_save

  if 'sha256' not in info_data:
    file_hash = _get_sha256_hash(file_path)
    if file_hash is not None:
      info_data['sha256'] = file_hash
      should_save = True

  if should_save:
    if 'trainedWords' in info_data:
      # Sort by count; if it doesn't exist, then assume it's a top item from civitai or elsewhere.
      info_data['trainedWords'] = sorted(info_data['trainedWords'],
                                         key=lambda w: w['count'] if 'count' in w else 99999,
                                         reverse=True)
    save_model_info(file, info_data, model_type)

    # If we're saving, then the UI is likely waiting to see if the refreshed data is coming in.
    await PromptServer.instance.send("rgthree-refreshed-lora-info", {"data": info_data})

  return info_data


def _update_data(info_data: dict) -> bool:
  """Ports old data to new data if necessary."""
  should_save = False
  # If we have "triggerWords" then move them over to "trainedWords"
  if 'triggerWords' in info_data and len(info_data['triggerWords']) > 0:
    civitai_words = ','.join((get_dict_value(info_data, 'raw.civitai.triggerWords', default=[]) +
                              get_dict_value(info_data, 'raw.civitai.trainedWords', default=[])))
    if 'trainedWords' not in info_data:
      info_data['trainedWords'] = []
    for trigger_word in info_data['triggerWords']:
      word_data = next((data for data in info_data['trainedWords'] if data['word'] == trigger_word),
                       None)
      if word_data is None:
        word_data = {'word': trigger_word}
        info_data['trainedWords'].append(word_data)
      if trigger_word in civitai_words:
        word_data['civitai'] = True
      else:
        word_data['user'] = True

    del info_data['triggerWords']
    should_save = True
  return should_save


def _merge_metadata(info_data: dict, data_meta: dict) -> bool:
  """Returns true if data was saved."""
  should_save = False

  base_model_file = get_dict_value(data_meta, 'ss_sd_model_name', None)
  if base_model_file:
    info_data['baseModelFile'] = base_model_file

  # Loop over metadata tags
  trained_words = {}
  if 'ss_tag_frequency' in data_meta and isinstance(data_meta['ss_tag_frequency'], dict):
    for bucket_value in data_meta['ss_tag_frequency'].values():
      if isinstance(bucket_value, dict):
        for tag, count in bucket_value.items():
          if tag not in trained_words:
            trained_words[tag] = {'word': tag, 'count': 0, 'metadata': True}
          trained_words[tag]['count'] = trained_words[tag]['count'] + count

  if 'trainedWords' not in info_data:
    info_data['trainedWords'] = list(trained_words.values())
    should_save = True
  else:
    # We can't merge, because the list may have other data, like it's part of civitaidata.
    merged_dict = {}
    for existing_word_data in info_data['trainedWords']:
      merged_dict[existing_word_data['word']] = existing_word_data
    for new_key, new_word_data in trained_words.items():
      if new_key not in merged_dict:
        merged_dict[new_key] = {}
      merged_dict[new_key] = {**merged_dict[new_key], **new_word_data}
    info_data['trainedWords'] = list(merged_dict.values())
    should_save = True

  # trained_words = list(trained_words.values())
  # info_data['meta_trained_words'] = trained_words
  info_data['raw']['metadata'] = data_meta
  should_save = True

  if 'sha256' not in info_data and '_sha256' in data_meta:
    info_data['sha256'] = data_meta['_sha256']
    should_save = True

  return should_save


def _merge_civitai_data(info_data: dict, data_civitai: dict) -> bool:
  """Returns true if data was saved."""
  should_save = False

  if 'name' not in info_data:
    info_data['name'] = get_dict_value(data_civitai, 'model.name', '')
    should_save = True
    version_name = get_dict_value(data_civitai, 'name')
    if version_name is not None:
      info_data['name'] += f' - {version_name}'

  if 'type' not in info_data:
    info_data['type'] = get_dict_value(data_civitai, 'model.type')
    should_save = True
  if 'baseModel' not in info_data:
    info_data['baseModel'] = get_dict_value(data_civitai, 'baseModel')
    should_save = True

  # We always want to merge triggerword.
  civitai_trigger = get_dict_value(data_civitai, 'triggerWords', default=[])
  civitai_trained = get_dict_value(data_civitai, 'trainedWords', default=[])
  civitai_words = ','.join(civitai_trigger + civitai_trained)
  if civitai_words:
    civitai_words = re.sub(r"\s*,\s*", ",", civitai_words)
    civitai_words = re.sub(r",+", ",", civitai_words)
    civitai_words = re.sub(r"^,", "", civitai_words)
    civitai_words = re.sub(r",$", "", civitai_words)
    if civitai_words:
      civitai_words = civitai_words.split(',')
      if 'trainedWords' not in info_data:
        info_data['trainedWords'] = []
      for trigger_word in civitai_words:
        word_data = next(
          (data for data in info_data['trainedWords'] if data['word'] == trigger_word), None)
        if word_data is None:
          word_data = {'word': trigger_word}
          info_data['trainedWords'].append(word_data)
        word_data['civitai'] = True

  if 'sha256' not in info_data:
    info_data['sha256'] = data_civitai['_sha256']
    should_save = True

  if 'modelId' in data_civitai:
    info_data['links'] = info_data['links'] if 'links' in info_data else []
    civitai_link = f'https://civitai.com/models/{get_dict_value(data_civitai, "modelId")}'
    if get_dict_value(data_civitai, "id"):
      civitai_link += f'?modelVersionId={get_dict_value(data_civitai, "id")}'
    info_data['links'].append(civitai_link)
    info_data['links'].append(data_civitai['_civitai_api'])
    should_save = True

  # Take images from civitai
  if 'images' in data_civitai:
    info_data_image_urls = list(map(lambda i: i['url']
                                    if 'url' in i else None, info_data['images']))
    for img in data_civitai['images']:
      img_url = get_dict_value(img, 'url')
      if img_url is not None and img_url not in info_data_image_urls:
        img_id = os.path.splitext(os.path.basename(img_url))[0] if img_url is not None else None
        img_data = {
          'url': img_url,
          'civitaiUrl': f'https://civitai.com/images/{img_id}' if img_id is not None else None,
          'width': get_dict_value(img, 'width'),
          'height': get_dict_value(img, 'height'),
          'type': get_dict_value(img, 'type'),
          'nsfwLevel': get_dict_value(img, 'nsfwLevel'),
          'seed': get_dict_value(img, 'meta.seed'),
          'positive': get_dict_value(img, 'meta.prompt'),
          'negative': get_dict_value(img, 'meta.negativePrompt'),
          'steps': get_dict_value(img, 'meta.steps'),
          'sampler': get_dict_value(img, 'meta.sampler'),
          'cfg': get_dict_value(img, 'meta.cfgScale'),
          'model': get_dict_value(img, 'meta.Model'),
          'resources': get_dict_value(img, 'meta.resources'),
        }
        info_data['images'].append(img_data)
        should_save = True

  # The raw data
  if 'civitai' not in info_data['raw']:
    info_data['raw']['civitai'] = data_civitai
    should_save = True

  return should_save


def _get_model_civitai_data(file: str, model_type, default=None, refresh=False):
  """Gets the civitai data, either cached from the user directory, or from civitai api."""
  file_hash = _get_sha256_hash(get_folder_path(file, model_type))
  if file_hash is None:
    return None

  json_file_path = _get_info_cache_file(file_hash, 'civitai')

  api_url = f'https://civitai.com/api/v1/model-versions/by-hash/{file_hash}'
  file_data = read_userdata_json(json_file_path)
  if file_data is None or refresh is True:
    try:
      response = requests.get(api_url, timeout=5000)
      data = response.json()
      save_userdata_json(json_file_path, {
        'url': api_url,
        'timestamp': datetime.now().timestamp(),
        'response': data
      })
      file_data = read_userdata_json(json_file_path)
    except requests.exceptions.RequestException as e:  # This is the correct syntax
      print(e)
  response = file_data['response'] if file_data is not None and 'response' in file_data else None
  if response is not None:
    response['_sha256'] = file_hash
    response['_civitai_api'] = api_url
  return response if response is not None else default


def _get_model_metadata(file: str, model_type, default=None, refresh=False):
  """Gets the metadata from the file itself."""
  file_path = get_folder_path(file, model_type)
  file_hash = _get_sha256_hash(file_path)
  if file_hash is None:
    return default

  json_file_path = _get_info_cache_file(file_hash, 'metadata')

  file_data = read_userdata_json(json_file_path)
  if file_data is None or refresh is True:
    data = _read_file_metadata_from_header(file_path)
    if data is not None:
      file_data = {'url': file, 'timestamp': datetime.now().timestamp(), 'response': data}
      save_userdata_json(json_file_path, file_data)
  response = file_data['response'] if file_data is not None and 'response' in file_data else None
  if response is not None:
    response['_sha256'] = file_hash
  return response if response is not None else default


def _read_file_metadata_from_header(file_path: str) -> dict:
  """Reads the file's header and returns a JSON dict metdata if available."""
  data = None
  try:
    if file_path.endswith('.safetensors'):
      with open(file_path, "rb") as file:
        # https://github.com/huggingface/safetensors#format
        # 8 bytes: N, an unsigned little-endian 64-bit integer, containing the size of the header
        header_size = int.from_bytes(file.read(8), "little", signed=False)

        if header_size <= 0:
          raise BufferError("Invalid header size")

        header = file.read(header_size)
        if header is None:
          raise BufferError("Invalid header")

        header_json = json.loads(header)
        data = header_json["__metadata__"] if "__metadata__" in header_json else None

        if data is not None:
          for key, value in data.items():
            if isinstance(value, str) and value.startswith('{') and value.endswith('}'):
              try:
                value_as_json = json.loads(value)
                data[key] = value_as_json
              except Exception:
                print(f'metdata for field {key} did not parse as json')
  except requests.exceptions.RequestException as e:
    print(e)
    data = None

  return data


def get_folder_path(file: str, model_type):
  """Gets the file path ensuring it exists."""
  file_path = folder_paths.get_full_path(model_type, file)
  if file_path and not path_exists(file_path):
    file_path = os.path.abspath(file_path)
  if not path_exists(file_path):
    file_path = None
  return file_path


def _get_sha256_hash(file_path: str):
  """Returns the hash for the file."""
  if not file_path or not path_exists(file_path):
    return None
  file_hash = None
  sha256_hash = hashlib.sha256()
  with open(file_path, "rb") as f:
    # Read and update hash string value in blocks of 4K
    for byte_block in iter(lambda: f.read(4096), b""):
      sha256_hash.update(byte_block)
    file_hash = sha256_hash.hexdigest()
  return file_hash


async def set_model_info_partial(file: str, model_type: str, info_data_partial):
  """Sets partial data into the existing model info data."""
  info_data = await get_model_info(file, model_type, default={})
  info_data = {**info_data, **info_data_partial}
  save_model_info(file, info_data, model_type)


def save_model_info(file: str, info_data, model_type):
  """Saves the model info alongside the model itself."""
  file_path = get_folder_path(file, model_type)
  if file_path is None:
    return
  try_info_path = f'{file_path}.rgthree-info.json'
  save_json_file(try_info_path, info_data)