gartajackhats1985's picture
Upload 1830 files
07f408f verified
import os
import sys
import subprocess
import re
import shutil
import configparser
import platform
from datetime import datetime
import git
from git.remote import RemoteProgress
from urllib.parse import urlparse
from tqdm.auto import tqdm
import aiohttp
import threading
import json
import time
import yaml
import zipfile
glob_path = os.path.join(os.path.dirname(__file__)) # ComfyUI-Manager/glob
sys.path.append(glob_path)
import cm_global
from manager_util import *
version = [2, 51, 9]
version_str = f"V{version[0]}.{version[1]}" + (f'.{version[2]}' if len(version) > 2 else '')
comfyui_manager_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
custom_nodes_path = os.path.abspath(os.path.join(comfyui_manager_path, '..'))
comfy_path = os.environ.get('COMFYUI_PATH')
if comfy_path is None:
comfy_path = os.path.abspath(os.path.join(custom_nodes_path, '..'))
channel_list_path = os.path.join(comfyui_manager_path, 'channels.list')
config_path = os.path.join(comfyui_manager_path, "config.ini")
startup_script_path = os.path.join(comfyui_manager_path, "startup-scripts")
git_script_path = os.path.join(comfyui_manager_path, "git_helper.py")
cache_dir = os.path.join(comfyui_manager_path, '.cache')
cached_config = None
js_path = None
comfy_ui_required_revision = 1930
comfy_ui_required_commit_datetime = datetime(2024, 1, 24, 0, 0, 0)
comfy_ui_revision = "Unknown"
comfy_ui_commit_datetime = datetime(1900, 1, 1, 0, 0, 0)
cache_lock = threading.Lock()
channel_dict = None
channel_list = None
pip_map = None
def remap_pip_package(pkg):
if pkg in cm_global.pip_overrides:
res = cm_global.pip_overrides[pkg]
print(f"[ComfyUI-Manager] '{pkg}' is remapped to '{res}'")
return res
else:
return pkg
def get_installed_packages():
global pip_map
if pip_map is None:
try:
result = subprocess.check_output([sys.executable, '-m', 'pip', 'list'], universal_newlines=True)
pip_map = {}
for line in result.split('\n'):
x = line.strip()
if x:
y = line.split()
if y[0] == 'Package' or y[0].startswith('-'):
continue
pip_map[y[0]] = y[1]
except subprocess.CalledProcessError as e:
print(f"[ComfyUI-Manager] Failed to retrieve the information of installed pip packages.")
return set()
return pip_map
def clear_pip_cache():
global pip_map
pip_map = None
def is_blacklisted(name):
name = name.strip()
pattern = r'([^<>!=]+)([<>!=]=?)([^ ]*)'
match = re.search(pattern, name)
if match:
name = match.group(1)
if name in cm_global.pip_blacklist:
return True
if name in cm_global.pip_downgrade_blacklist:
pips = get_installed_packages()
if match is None:
if name in pips:
return True
elif match.group(2) in ['<=', '==', '<']:
if name in pips:
if StrictVersion(pips[name]) >= StrictVersion(match.group(3)):
return True
return False
def is_installed(name):
name = name.strip()
if name.startswith('#'):
return True
pattern = r'([^<>!=]+)([<>!=]=?)([0-9.a-zA-Z]*)'
match = re.search(pattern, name)
if match:
name = match.group(1)
if name in cm_global.pip_blacklist:
return True
if name in cm_global.pip_downgrade_blacklist:
pips = get_installed_packages()
if match is None:
if name in pips:
return True
elif match.group(2) in ['<=', '==', '<']:
if name in pips:
if StrictVersion(pips[name]) >= StrictVersion(match.group(3)):
print(f"[ComfyUI-Manager] skip black listed pip installation: '{name}'")
return True
return name.lower() in get_installed_packages()
def get_channel_dict():
global channel_dict
if channel_dict is None:
channel_dict = {}
if not os.path.exists(channel_list_path):
shutil.copy(channel_list_path+'.template', channel_list_path)
with open(os.path.join(comfyui_manager_path, 'channels.list'), 'r') as file:
channels = file.read()
for x in channels.split('\n'):
channel_info = x.split("::")
if len(channel_info) == 2:
channel_dict[channel_info[0]] = channel_info[1]
return channel_dict
def get_channel_list():
global channel_list
if channel_list is None:
channel_list = []
for k, v in get_channel_dict().items():
channel_list.append(f"{k}::{v}")
return channel_list
class ManagerFuncs:
def __init__(self):
pass
def get_current_preview_method(self):
return "none"
def run_script(self, cmd, cwd='.'):
if len(cmd) > 0 and cmd[0].startswith("#"):
print(f"[ComfyUI-Manager] Unexpected behavior: `{cmd}`")
return 0
new_env = os.environ.copy()
new_env["COMFYUI_PATH"] = comfy_path
subprocess.check_call(cmd, cwd=cwd, env=new_env)
return 0
manager_funcs = ManagerFuncs()
def write_config():
config = configparser.ConfigParser()
config['default'] = {
'preview_method': manager_funcs.get_current_preview_method(),
'badge_mode': get_config()['badge_mode'],
'git_exe': get_config()['git_exe'],
'channel_url': get_config()['channel_url'],
'share_option': get_config()['share_option'],
'bypass_ssl': get_config()['bypass_ssl'],
"file_logging": get_config()['file_logging'],
'default_ui': get_config()['default_ui'],
'component_policy': get_config()['component_policy'],
'double_click_policy': get_config()['double_click_policy'],
'windows_selector_event_loop_policy': get_config()['windows_selector_event_loop_policy'],
'model_download_by_agent': get_config()['model_download_by_agent'],
'downgrade_blacklist': get_config()['downgrade_blacklist'],
'security_level': get_config()['security_level'],
}
with open(config_path, 'w') as configfile:
config.write(configfile)
def read_config():
try:
config = configparser.ConfigParser()
config.read(config_path)
default_conf = config['default']
# policy migration: disable_unsecure_features -> security_level
if 'disable_unsecure_features' in default_conf:
if default_conf['disable_unsecure_features'].lower() == 'true':
security_level = 'strong'
else:
security_level = 'normal'
else:
security_level = default_conf['security_level'] if 'security_level' in default_conf else 'normal'
return {
'preview_method': default_conf['preview_method'] if 'preview_method' in default_conf else manager_funcs.get_current_preview_method(),
'badge_mode': default_conf['badge_mode'] if 'badge_mode' in default_conf else 'none',
'git_exe': default_conf['git_exe'] if 'git_exe' in default_conf else '',
'channel_url': default_conf['channel_url'] if 'channel_url' in default_conf else 'https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main',
'share_option': default_conf['share_option'] if 'share_option' in default_conf else 'all',
'bypass_ssl': default_conf['bypass_ssl'].lower() == 'true' if 'bypass_ssl' in default_conf else False,
'file_logging': default_conf['file_logging'].lower() == 'true' if 'file_logging' in default_conf else True,
'default_ui': default_conf['default_ui'] if 'default_ui' in default_conf else 'none',
'component_policy': default_conf['component_policy'] if 'component_policy' in default_conf else 'workflow',
'double_click_policy': default_conf['double_click_policy'] if 'double_click_policy' in default_conf else 'copy-all',
'windows_selector_event_loop_policy': default_conf['windows_selector_event_loop_policy'].lower() == 'true' if 'windows_selector_event_loop_policy' in default_conf else False,
'model_download_by_agent': default_conf['model_download_by_agent'].lower() == 'true' if 'model_download_by_agent' in default_conf else False,
'downgrade_blacklist': default_conf['downgrade_blacklist'] if 'downgrade_blacklist' in default_conf else '',
'security_level': security_level
}
except Exception:
return {
'preview_method': manager_funcs.get_current_preview_method(),
'badge_mode': 'none',
'git_exe': '',
'channel_url': 'https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main',
'share_option': 'all',
'bypass_ssl': False,
'file_logging': True,
'default_ui': 'none',
'component_policy': 'workflow',
'double_click_policy': 'copy-all',
'windows_selector_event_loop_policy': False,
'model_download_by_agent': False,
'downgrade_blacklist': '',
'security_level': 'normal',
}
def get_config():
global cached_config
if cached_config is None:
cached_config = read_config()
return cached_config
def switch_to_default_branch(repo):
show_result = repo.git.remote("show", "origin")
matches = re.search(r"\s*HEAD branch:\s*(.*)", show_result)
if matches:
default_branch = matches.group(1)
repo.git.checkout(default_branch)
def try_install_script(url, repo_path, install_cmd, instant_execution=False):
if not instant_execution and ((len(install_cmd) > 0 and install_cmd[0].startswith('#')) or (platform.system() == "Windows" and comfy_ui_commit_datetime.date() >= comfy_ui_required_commit_datetime.date())):
if not os.path.exists(startup_script_path):
os.makedirs(startup_script_path)
script_path = os.path.join(startup_script_path, "install-scripts.txt")
with open(script_path, "a") as file:
obj = [repo_path] + install_cmd
file.write(f"{obj}\n")
return True
else:
if len(install_cmd) == 5 and install_cmd[2:4] == ['pip', 'install']:
if is_blacklisted(install_cmd[4]):
print(f"[ComfyUI-Manager] skip black listed pip installation: '{install_cmd[4]}'")
return True
print(f"\n## ComfyUI-Manager: EXECUTE => {install_cmd}")
code = manager_funcs.run_script(install_cmd, cwd=repo_path)
if platform.system() != "Windows":
try:
if comfy_ui_commit_datetime.date() < comfy_ui_required_commit_datetime.date():
print("\n\n###################################################################")
print(f"[WARN] ComfyUI-Manager: Your ComfyUI version ({comfy_ui_revision})[{comfy_ui_commit_datetime.date()}] is too old. Please update to the latest version.")
print(f"[WARN] The extension installation feature may not work properly in the current installed ComfyUI version on Windows environment.")
print("###################################################################\n\n")
except:
pass
if code != 0:
if url is None:
url = os.path.dirname(repo_path)
print(f"install script failed: {url}")
return False
# use subprocess to avoid file system lock by git (Windows)
def __win_check_git_update(path, do_fetch=False, do_update=False):
if do_fetch:
command = [sys.executable, git_script_path, "--fetch", path]
elif do_update:
command = [sys.executable, git_script_path, "--pull", path]
else:
command = [sys.executable, git_script_path, "--check", path]
new_env = os.environ.copy()
new_env["COMFYUI_PATH"] = comfy_path
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=custom_nodes_path)
output, _ = process.communicate()
output = output.decode('utf-8').strip()
if 'detected dubious' in output:
# fix and try again
safedir_path = path.replace('\\', '/')
try:
print(f"[ComfyUI-Manager] Try fixing 'dubious repository' error on '{safedir_path}' repo")
process = subprocess.Popen(['git', 'config', '--global', '--add', 'safe.directory', safedir_path], env=new_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, _ = process.communicate()
process = subprocess.Popen(command, env=new_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, _ = process.communicate()
output = output.decode('utf-8').strip()
except Exception:
print(f'[ComfyUI-Manager] failed to fixing')
if 'detected dubious' in output:
print(f'\n[ComfyUI-Manager] Failed to fixing repository setup. Please execute this command on cmd: \n'
f'-----------------------------------------------------------------------------------------\n'
f'git config --global --add safe.directory "{safedir_path}"\n'
f'-----------------------------------------------------------------------------------------\n')
if do_update:
if "CUSTOM NODE PULL: Success" in output:
process.wait()
print(f"\rUpdated: {path}")
return True, True # updated
elif "CUSTOM NODE PULL: None" in output:
process.wait()
return False, True # there is no update
else:
print(f"\rUpdate error: {path}")
process.wait()
return False, False # update failed
else:
if "CUSTOM NODE CHECK: True" in output:
process.wait()
return True, True
elif "CUSTOM NODE CHECK: False" in output:
process.wait()
return False, True
else:
print(f"\rFetch error: {path}")
print(f"\n{output}\n")
process.wait()
return False, True
def __win_check_git_pull(path):
new_env = os.environ.copy()
new_env["COMFYUI_PATH"] = comfy_path
command = [sys.executable, git_script_path, "--pull", path]
process = subprocess.Popen(command, env=new_env, cwd=custom_nodes_path)
process.wait()
def execute_install_script(url, repo_path, lazy_mode=False, instant_execution=False):
install_script_path = os.path.join(repo_path, "install.py")
requirements_path = os.path.join(repo_path, "requirements.txt")
if lazy_mode:
install_cmd = ["#LAZY-INSTALL-SCRIPT", sys.executable]
try_install_script(url, repo_path, install_cmd)
else:
if os.path.exists(requirements_path):
print("Install: pip packages")
with open(requirements_path, "r") as requirements_file:
for line in requirements_file:
#handle comments
if '#' in line:
if line.strip()[0] == '#':
print("Line is comment...skipping")
continue
else:
line = line.split('#')[0].strip()
package_name = remap_pip_package(line.strip())
if package_name and not package_name.startswith('#'):
if '--index-url' in package_name:
s = package_name.split('--index-url')
install_cmd = [sys.executable, "-m", "pip", "install", s[0].strip(), '--index-url', s[1].strip()]
else:
install_cmd = [sys.executable, "-m", "pip", "install", package_name]
if package_name.strip() != "" and not package_name.startswith('#'):
try_install_script(url, repo_path, install_cmd, instant_execution=instant_execution)
if os.path.exists(install_script_path):
print(f"Install: install script")
install_cmd = [sys.executable, "install.py"]
try_install_script(url, repo_path, install_cmd, instant_execution=instant_execution)
return True
def git_repo_has_updates(path, do_fetch=False, do_update=False):
if do_fetch:
print(f"\x1b[2K\rFetching: {path}", end='')
elif do_update:
print(f"\x1b[2K\rUpdating: {path}", end='')
# Check if the path is a git repository
if not os.path.exists(os.path.join(path, '.git')):
raise ValueError('Not a git repository')
if platform.system() == "Windows":
updated, success = __win_check_git_update(path, do_fetch, do_update)
if updated and success:
execute_install_script(None, path, lazy_mode=True)
return updated, success
else:
# Fetch the latest commits from the remote repository
repo = git.Repo(path)
current_branch = repo.active_branch
branch_name = current_branch.name
remote_name = 'origin'
remote = repo.remote(name=remote_name)
# Get the current commit hash
commit_hash = repo.head.commit.hexsha
if do_fetch or do_update:
remote.fetch()
if do_update:
if repo.head.is_detached:
switch_to_default_branch(repo)
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha
if commit_hash == remote_commit_hash:
repo.close()
return False, True
try:
remote.pull()
repo.git.submodule('update', '--init', '--recursive')
new_commit_hash = repo.head.commit.hexsha
if commit_hash != new_commit_hash:
execute_install_script(None, path)
print(f"\x1b[2K\rUpdated: {path}")
return True, True
else:
return False, False
except Exception as e:
print(f"\nUpdating failed: {path}\n{e}", file=sys.stderr)
return False, False
if repo.head.is_detached:
repo.close()
return True, True
# Get commit hash of the remote branch
current_branch = repo.active_branch
branch_name = current_branch.name
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha
# Compare the commit hashes to determine if the local repository is behind the remote repository
if commit_hash != remote_commit_hash:
# Get the commit dates
commit_date = repo.head.commit.committed_datetime
remote_commit_date = repo.refs[f'{remote_name}/{branch_name}'].object.committed_datetime
# Compare the commit dates to determine if the local repository is behind the remote repository
if commit_date < remote_commit_date:
repo.close()
return True, True
repo.close()
return False, True
class GitProgress(RemoteProgress):
def __init__(self):
super().__init__()
self.pbar = tqdm()
def update(self, op_code, cur_count, max_count=None, message=''):
self.pbar.total = max_count
self.pbar.n = cur_count
self.pbar.pos = 0
self.pbar.refresh()
def is_valid_url(url):
try:
# Check for HTTP/HTTPS URL format
result = urlparse(url)
if all([result.scheme, result.netloc]):
return True
finally:
# Check for SSH git URL format
pattern = re.compile(r"^(.+@|ssh:\/\/).+:.+$")
if pattern.match(url):
return True
return False
def gitclone_install(files, instant_execution=False, msg_prefix=''):
print(f"{msg_prefix}Install: {files}")
for url in files:
if not is_valid_url(url):
print(f"Invalid git url: '{url}'")
return False
if url.endswith("/"):
url = url[:-1]
try:
print(f"Download: git clone '{url}'")
repo_name = os.path.splitext(os.path.basename(url))[0]
repo_path = os.path.join(custom_nodes_path, repo_name)
# Clone the repository from the remote URL
if not instant_execution and platform.system() == 'Windows':
res = manager_funcs.run_script([sys.executable, git_script_path, "--clone", custom_nodes_path, url], cwd=custom_nodes_path)
if res != 0:
return False
else:
repo = git.Repo.clone_from(url, repo_path, recursive=True, progress=GitProgress())
repo.git.clear_cache()
repo.close()
if not execute_install_script(url, repo_path, instant_execution=instant_execution):
return False
except Exception as e:
print(f"Install(git-clone) error: {url} / {e}", file=sys.stderr)
return False
print("Installation was successful.")
return True
def git_pull(path):
# Check if the path is a git repository
if not os.path.exists(os.path.join(path, '.git')):
raise ValueError('Not a git repository')
# Pull the latest changes from the remote repository
if platform.system() == "Windows":
return __win_check_git_pull(path)
else:
repo = git.Repo(path)
if repo.is_dirty():
repo.git.stash()
if repo.head.is_detached:
switch_to_default_branch(repo)
current_branch = repo.active_branch
remote_name = current_branch.tracking_branch().remote_name
remote = repo.remote(name=remote_name)
remote.pull()
repo.git.submodule('update', '--init', '--recursive')
repo.close()
return True
async def get_data(uri, silent=False):
if not silent:
print(f"FETCH DATA from: {uri}", end="")
if uri.startswith("http"):
async with aiohttp.ClientSession(trust_env=True, connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
async with session.get(uri) as resp:
json_text = await resp.text()
else:
with cache_lock:
with open(uri, "r", encoding="utf-8") as f:
json_text = f.read()
json_obj = json.loads(json_text)
if not silent:
print(f" [DONE]")
return json_obj
def simple_hash(input_string):
hash_value = 0
for char in input_string:
hash_value = (hash_value * 31 + ord(char)) % (2**32)
return hash_value
def is_file_created_within_one_day(file_path):
if not os.path.exists(file_path):
return False
file_creation_time = os.path.getctime(file_path)
current_time = datetime.now().timestamp()
time_difference = current_time - file_creation_time
return time_difference <= 86400
async def get_data_by_mode(mode, filename, channel_url=None):
if channel_url in get_channel_dict():
channel_url = get_channel_dict()[channel_url]
try:
if mode == "local":
uri = os.path.join(comfyui_manager_path, filename)
json_obj = await get_data(uri)
else:
if channel_url is None:
uri = get_config()['channel_url'] + '/' + filename
else:
uri = channel_url + '/' + filename
cache_uri = str(simple_hash(uri))+'_'+filename
cache_uri = os.path.join(cache_dir, cache_uri)
if mode == "cache":
if is_file_created_within_one_day(cache_uri):
json_obj = await get_data(cache_uri)
else:
json_obj = await get_data(uri)
with cache_lock:
with open(cache_uri, "w", encoding='utf-8') as file:
json.dump(json_obj, file, indent=4, sort_keys=True)
else:
json_obj = await get_data(uri)
with cache_lock:
with open(cache_uri, "w", encoding='utf-8') as file:
json.dump(json_obj, file, indent=4, sort_keys=True)
except Exception as e:
print(f"[ComfyUI-Manager] Due to a network error, switching to local mode.\n=> {filename}\n=> {e}")
uri = os.path.join(comfyui_manager_path, filename)
json_obj = await get_data(uri)
return json_obj
def gitclone_fix(files, instant_execution=False):
print(f"Try fixing: {files}")
for url in files:
if not is_valid_url(url):
print(f"Invalid git url: '{url}'")
return False
if url.endswith("/"):
url = url[:-1]
try:
repo_name = os.path.splitext(os.path.basename(url))[0]
repo_path = os.path.join(custom_nodes_path, repo_name)
if os.path.exists(repo_path+'.disabled'):
repo_path = repo_path+'.disabled'
if not execute_install_script(url, repo_path, instant_execution=instant_execution):
return False
except Exception as e:
print(f"Install(git-clone) error: {url} / {e}", file=sys.stderr)
return False
print(f"Attempt to fixing '{files}' is done.")
return True
def pip_install(packages):
install_cmd = ['#FORCE', sys.executable, "-m", "pip", "install", '-U'] + packages
try_install_script('pip install via manager', '..', install_cmd)
def rmtree(path):
retry_count = 3
while True:
try:
retry_count -= 1
if platform.system() == "Windows":
manager_funcs.run_script(['attrib', '-R', path + '\\*', '/S'])
shutil.rmtree(path)
return True
except Exception as ex:
print(f"ex: {ex}")
time.sleep(3)
if retry_count < 0:
raise ex
print(f"Uninstall retry({retry_count})")
def gitclone_uninstall(files):
import os
print(f"Uninstall: {files}")
for url in files:
if url.endswith("/"):
url = url[:-1]
try:
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "")
dir_path = os.path.join(custom_nodes_path, dir_name)
# safety check
if dir_path == '/' or dir_path[1:] == ":/" or dir_path == '':
print(f"Uninstall(git-clone) error: invalid path '{dir_path}' for '{url}'")
return False
install_script_path = os.path.join(dir_path, "uninstall.py")
disable_script_path = os.path.join(dir_path, "disable.py")
if os.path.exists(install_script_path):
uninstall_cmd = [sys.executable, "uninstall.py"]
code = manager_funcs.run_script(uninstall_cmd, cwd=dir_path)
if code != 0:
print(f"An error occurred during the execution of the uninstall.py script. Only the '{dir_path}' will be deleted.")
elif os.path.exists(disable_script_path):
disable_script = [sys.executable, "disable.py"]
code = manager_funcs.run_script(disable_script, cwd=dir_path)
if code != 0:
print(f"An error occurred during the execution of the disable.py script. Only the '{dir_path}' will be deleted.")
if os.path.exists(dir_path):
rmtree(dir_path)
elif os.path.exists(dir_path + ".disabled"):
rmtree(dir_path + ".disabled")
except Exception as e:
print(f"Uninstall(git-clone) error: {url} / {e}", file=sys.stderr)
return False
print("Uninstallation was successful.")
return True
def gitclone_set_active(files, is_disable):
import os
if is_disable:
action_name = "Disable"
else:
action_name = "Enable"
print(f"{action_name}: {files}")
for url in files:
if url.endswith("/"):
url = url[:-1]
try:
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "")
dir_path = os.path.join(custom_nodes_path, dir_name)
# safety check
if dir_path == '/' or dir_path[1:] == ":/" or dir_path == '':
print(f"{action_name}(git-clone) error: invalid path '{dir_path}' for '{url}'")
return False
if is_disable:
current_path = dir_path
new_path = dir_path + ".disabled"
else:
current_path = dir_path + ".disabled"
new_path = dir_path
os.rename(current_path, new_path)
if is_disable:
if os.path.exists(os.path.join(new_path, "disable.py")):
disable_script = [sys.executable, "disable.py"]
try_install_script(url, new_path, disable_script)
else:
if os.path.exists(os.path.join(new_path, "enable.py")):
enable_script = [sys.executable, "enable.py"]
try_install_script(url, new_path, enable_script)
except Exception as e:
print(f"{action_name}(git-clone) error: {url} / {e}", file=sys.stderr)
return False
print(f"{action_name} was successful.")
return True
def gitclone_update(files, instant_execution=False, skip_script=False, msg_prefix=""):
import os
print(f"{msg_prefix}Update: {files}")
for url in files:
if url.endswith("/"):
url = url[:-1]
try:
repo_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "")
repo_path = os.path.join(custom_nodes_path, repo_name)
if os.path.exists(repo_path+'.disabled'):
repo_path = repo_path+'.disabled'
git_pull(repo_path)
if not skip_script:
if instant_execution:
if not execute_install_script(url, repo_path, lazy_mode=False, instant_execution=True):
return False
else:
if not execute_install_script(url, repo_path, lazy_mode=True):
return False
except Exception as e:
print(f"Update(git-clone) error: {url} / {e}", file=sys.stderr)
return False
if not skip_script:
print("Update was successful.")
return True
def update_path(repo_path, instant_execution=False):
if not os.path.exists(os.path.join(repo_path, '.git')):
return "fail"
# version check
repo = git.Repo(repo_path)
if repo.head.is_detached:
switch_to_default_branch(repo)
current_branch = repo.active_branch
branch_name = current_branch.name
if current_branch.tracking_branch() is None:
print(f"[ComfyUI-Manager] There is no tracking branch ({current_branch})")
remote_name = 'origin'
else:
remote_name = current_branch.tracking_branch().remote_name
remote = repo.remote(name=remote_name)
try:
remote.fetch()
except Exception as e:
if 'detected dubious' in str(e):
print(f"[ComfyUI-Manager] Try fixing 'dubious repository' error on 'ComfyUI' repository")
safedir_path = comfy_path.replace('\\', '/')
subprocess.run(['git', 'config', '--global', '--add', 'safe.directory', safedir_path])
try:
remote.fetch()
except Exception:
print(f"\n[ComfyUI-Manager] Failed to fixing repository setup. Please execute this command on cmd: \n"
f"-----------------------------------------------------------------------------------------\n"
f'git config --global --add safe.directory "{safedir_path}"\n'
f"-----------------------------------------------------------------------------------------\n")
commit_hash = repo.head.commit.hexsha
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha
if commit_hash != remote_commit_hash:
git_pull(repo_path)
execute_install_script("ComfyUI", repo_path, instant_execution=instant_execution)
return "updated"
else:
return "skipped"
def lookup_customnode_by_url(data, target):
for x in data['custom_nodes']:
if target in x['files']:
dir_name = os.path.splitext(os.path.basename(target))[0].replace(".git", "")
dir_path = os.path.join(custom_nodes_path, dir_name)
if os.path.exists(dir_path):
x['installed'] = 'True'
elif os.path.exists(dir_path + ".disabled"):
x['installed'] = 'Disabled'
return x
return None
def simple_check_custom_node(url):
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "")
dir_path = os.path.join(custom_nodes_path, dir_name)
if os.path.exists(dir_path):
return 'installed'
elif os.path.exists(dir_path+'.disabled'):
return 'disabled'
return 'not-installed'
def check_a_custom_node_installed(item, do_fetch=False, do_update_check=True, do_update=False):
item['installed'] = 'None'
if item['install_type'] == 'git-clone' and len(item['files']) == 1:
url = item['files'][0]
if url.endswith("/"):
url = url[:-1]
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "")
dir_path = os.path.join(custom_nodes_path, dir_name)
if os.path.exists(dir_path):
try:
item['installed'] = 'True' # default
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name):
item['installed'] = 'Fail'
if do_update_check:
update_state, success = git_repo_has_updates(dir_path, do_fetch, do_update)
if (do_update_check or do_update) and update_state:
item['installed'] = 'Update'
elif do_update and not success:
item['installed'] = 'Fail'
except:
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name):
item['installed'] = 'Fail'
else:
item['installed'] = 'True'
elif os.path.exists(dir_path + ".disabled"):
item['installed'] = 'Disabled'
else:
item['installed'] = 'False'
elif item['install_type'] == 'copy' and len(item['files']) == 1:
dir_name = os.path.basename(item['files'][0])
if item['files'][0].endswith('.py'):
base_path = custom_nodes_path
elif 'js_path' in item:
base_path = os.path.join(js_path, item['js_path'])
else:
base_path = js_path
file_path = os.path.join(base_path, dir_name)
if os.path.exists(file_path):
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name):
item['installed'] = 'Fail'
else:
item['installed'] = 'True'
elif os.path.exists(file_path + ".disabled"):
item['installed'] = 'Disabled'
else:
item['installed'] = 'False'
def get_installed_pip_packages():
# extract pip package infos
pips = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'], text=True).split('\n')
res = {}
for x in pips:
if x.strip() == "":
continue
if ' @ ' in x:
spec_url = x.split(' @ ')
res[spec_url[0]] = spec_url[1]
else:
res[x] = ""
return res
def get_current_snapshot():
# Get ComfyUI hash
repo_path = comfy_path
if not os.path.exists(os.path.join(repo_path, '.git')):
print(f"ComfyUI update fail: The installed ComfyUI does not have a Git repository.")
return {}
repo = git.Repo(repo_path)
comfyui_commit_hash = repo.head.commit.hexsha
git_custom_nodes = {}
file_custom_nodes = []
# Get custom nodes hash
for path in os.listdir(custom_nodes_path):
fullpath = os.path.join(custom_nodes_path, path)
if os.path.isdir(fullpath):
is_disabled = path.endswith(".disabled")
try:
git_dir = os.path.join(fullpath, '.git')
if not os.path.exists(git_dir):
continue
repo = git.Repo(fullpath)
commit_hash = repo.head.commit.hexsha
url = repo.remotes.origin.url
git_custom_nodes[url] = {
'hash': commit_hash,
'disabled': is_disabled
}
except:
print(f"Failed to extract snapshots for the custom node '{path}'.")
elif path.endswith('.py'):
is_disabled = path.endswith(".py.disabled")
filename = os.path.basename(path)
item = {
'filename': filename,
'disabled': is_disabled
}
file_custom_nodes.append(item)
pip_packages = get_installed_pip_packages()
return {
'comfyui': comfyui_commit_hash,
'git_custom_nodes': git_custom_nodes,
'file_custom_nodes': file_custom_nodes,
'pips': pip_packages,
}
def save_snapshot_with_postfix(postfix, path=None):
if path is None:
now = datetime.now()
date_time_format = now.strftime("%Y-%m-%d_%H-%M-%S")
file_name = f"{date_time_format}_{postfix}"
path = os.path.join(comfyui_manager_path, 'snapshots', f"{file_name}.json")
else:
file_name = path.replace('\\', '/').split('/')[-1]
file_name = file_name.split('.')[-2]
snapshot = get_current_snapshot()
if path.endswith('.json'):
with open(path, "w") as json_file:
json.dump(snapshot, json_file, indent=4)
return file_name + '.json'
elif path.endswith('.yaml'):
with open(path, "w") as yaml_file:
snapshot = {'custom_nodes': snapshot}
yaml.dump(snapshot, yaml_file, allow_unicode=True)
return path
async def extract_nodes_from_workflow(filepath, mode='local', channel_url='default'):
# prepare json data
workflow = None
if filepath.endswith('.json'):
with open(filepath, "r", encoding="UTF-8", errors="ignore") as json_file:
try:
workflow = json.load(json_file)
except:
print(f"Invalid workflow file: {filepath}")
exit(-1)
elif filepath.endswith('.png'):
from PIL import Image
with Image.open(filepath) as img:
if 'workflow' not in img.info:
print(f"The specified .png file doesn't have a workflow: {filepath}")
exit(-1)
else:
try:
workflow = json.loads(img.info['workflow'])
except:
print(f"This is not a valid .png file containing a ComfyUI workflow: {filepath}")
exit(-1)
if workflow is None:
print(f"Invalid workflow file: {filepath}")
exit(-1)
# extract nodes
used_nodes = set()
def extract_nodes(sub_workflow):
for x in sub_workflow['nodes']:
node_name = x.get('type')
# skip virtual nodes
if node_name in ['Reroute', 'Note']:
continue
if node_name is not None and not (node_name.startswith('workflow/') or node_name.startswith('workflow>')):
used_nodes.add(node_name)
if 'nodes' in workflow:
extract_nodes(workflow)
if 'extra' in workflow:
if 'groupNodes' in workflow['extra']:
for x in workflow['extra']['groupNodes'].values():
extract_nodes(x)
# lookup dependent custom nodes
ext_map = await get_data_by_mode(mode, 'extension-node-map.json', channel_url)
rext_map = {}
preemption_map = {}
patterns = []
for k, v in ext_map.items():
if k == 'https://github.com/comfyanonymous/ComfyUI':
for x in v[0]:
if x not in preemption_map:
preemption_map[x] = []
preemption_map[x] = k
continue
for x in v[0]:
if x not in rext_map:
rext_map[x] = []
rext_map[x].append(k)
if 'preemptions' in v[1]:
for x in v[1]['preemptions']:
if x not in preemption_map:
preemption_map[x] = []
preemption_map[x] = k
if 'nodename_pattern' in v[1]:
patterns.append((v[1]['nodename_pattern'], k))
# identify used extensions
used_exts = set()
unknown_nodes = set()
for node_name in used_nodes:
ext = preemption_map.get(node_name)
if ext is None:
ext = rext_map.get(node_name)
if ext is not None:
ext = ext[0]
if ext is None:
for pat_ext in patterns:
if re.search(pat_ext[0], node_name):
ext = pat_ext[1]
break
if ext == 'https://github.com/comfyanonymous/ComfyUI':
pass
elif ext is not None:
if 'Fooocus' in ext:
print(f">> {node_name}")
used_exts.add(ext)
else:
unknown_nodes.add(node_name)
return used_exts, unknown_nodes
def unzip(model_path):
if not os.path.exists(model_path):
print(f"[ComfyUI-Manager] unzip: File not found: {model_path}")
return False
base_dir = os.path.dirname(model_path)
filename = os.path.basename(model_path)
target_dir = os.path.join(base_dir, filename[:-4])
os.makedirs(target_dir, exist_ok=True)
with zipfile.ZipFile(model_path, 'r') as zip_ref:
zip_ref.extractall(target_dir)
# Check if there's only one directory inside the target directory
contents = os.listdir(target_dir)
if len(contents) == 1 and os.path.isdir(os.path.join(target_dir, contents[0])):
nested_dir = os.path.join(target_dir, contents[0])
# Move each file and sub-directory in the nested directory up to the target directory
for item in os.listdir(nested_dir):
shutil.move(os.path.join(nested_dir, item), os.path.join(target_dir, item))
# Remove the now empty nested directory
os.rmdir(nested_dir)
os.remove(model_path)
return True