Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import subprocess | |
import re | |
import shutil | |
import configparser | |
import platform | |
from datetime import datetime | |
import git | |
from git.remote import RemoteProgress | |
from urllib.parse import urlparse | |
from tqdm.auto import tqdm | |
import aiohttp | |
import threading | |
import json | |
import time | |
import yaml | |
import zipfile | |
glob_path = os.path.join(os.path.dirname(__file__)) # ComfyUI-Manager/glob | |
sys.path.append(glob_path) | |
import cm_global | |
from manager_util import * | |
version = [2, 54] | |
version_str = f"V{version[0]}.{version[1]}" + (f'.{version[2]}' if len(version) > 2 else '') | |
comfyui_manager_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | |
custom_nodes_path = os.path.abspath(os.path.join(comfyui_manager_path, '..')) | |
default_custom_nodes_path = None | |
def get_default_custom_nodes_path(): | |
global default_custom_nodes_path | |
if default_custom_nodes_path is None: | |
try: | |
import folder_paths | |
default_custom_nodes_path = folder_paths.get_folder_paths("custom_nodes")[0] | |
except: | |
default_custom_nodes_path = custom_nodes_path | |
return default_custom_nodes_path | |
def get_custom_nodes_paths(): | |
try: | |
import folder_paths | |
return folder_paths.get_folder_paths("custom_nodes") | |
except: | |
return [custom_nodes_path] | |
comfy_path = os.environ.get('COMFYUI_PATH') | |
if comfy_path is None: | |
comfy_path = os.path.abspath(os.path.join(custom_nodes_path, '..')) | |
channel_list_path = os.path.join(comfyui_manager_path, 'channels.list') | |
config_path = os.path.join(comfyui_manager_path, "config.ini") | |
startup_script_path = os.path.join(comfyui_manager_path, "startup-scripts") | |
git_script_path = os.path.join(comfyui_manager_path, "git_helper.py") | |
cache_dir = os.path.join(comfyui_manager_path, '.cache') | |
cached_config = None | |
js_path = None | |
comfy_ui_required_revision = 1930 | |
comfy_ui_required_commit_datetime = datetime(2024, 1, 24, 0, 0, 0) | |
comfy_ui_revision = "Unknown" | |
comfy_ui_commit_datetime = datetime(1900, 1, 1, 0, 0, 0) | |
cache_lock = threading.Lock() | |
channel_dict = None | |
channel_list = None | |
pip_map = None | |
def remap_pip_package(pkg): | |
if pkg in cm_global.pip_overrides: | |
res = cm_global.pip_overrides[pkg] | |
print(f"[ComfyUI-Manager] '{pkg}' is remapped to '{res}'") | |
return res | |
else: | |
return pkg | |
def get_installed_packages(): | |
global pip_map | |
if pip_map is None: | |
try: | |
result = subprocess.check_output([sys.executable, '-m', 'pip', 'list'], universal_newlines=True) | |
pip_map = {} | |
for line in result.split('\n'): | |
x = line.strip() | |
if x: | |
y = line.split() | |
if y[0] == 'Package' or y[0].startswith('-'): | |
continue | |
pip_map[y[0]] = y[1] | |
except subprocess.CalledProcessError as e: | |
print(f"[ComfyUI-Manager] Failed to retrieve the information of installed pip packages.") | |
return set() | |
return pip_map | |
def clear_pip_cache(): | |
global pip_map | |
pip_map = None | |
def is_blacklisted(name): | |
name = name.strip() | |
pattern = r'([^<>!=]+)([<>!=]=?)([^ ]*)' | |
match = re.search(pattern, name) | |
if match: | |
name = match.group(1) | |
if name in cm_global.pip_blacklist: | |
return True | |
if name in cm_global.pip_downgrade_blacklist: | |
pips = get_installed_packages() | |
if match is None: | |
if name in pips: | |
return True | |
elif match.group(2) in ['<=', '==', '<']: | |
if name in pips: | |
if StrictVersion(pips[name]) >= StrictVersion(match.group(3)): | |
return True | |
return False | |
def is_installed(name): | |
name = name.strip() | |
if name.startswith('#'): | |
return True | |
pattern = r'([^<>!=]+)([<>!=]=?)([0-9.a-zA-Z]*)' | |
match = re.search(pattern, name) | |
if match: | |
name = match.group(1) | |
if name in cm_global.pip_blacklist: | |
return True | |
if name in cm_global.pip_downgrade_blacklist: | |
pips = get_installed_packages() | |
if match is None: | |
if name in pips: | |
return True | |
elif match.group(2) in ['<=', '==', '<']: | |
if name in pips: | |
if StrictVersion(pips[name]) >= StrictVersion(match.group(3)): | |
print(f"[ComfyUI-Manager] skip black listed pip installation: '{name}'") | |
return True | |
return name.lower() in get_installed_packages() | |
def get_channel_dict(): | |
global channel_dict | |
if channel_dict is None: | |
channel_dict = {} | |
if not os.path.exists(channel_list_path): | |
shutil.copy(channel_list_path+'.template', channel_list_path) | |
with open(os.path.join(comfyui_manager_path, 'channels.list'), 'r') as file: | |
channels = file.read() | |
for x in channels.split('\n'): | |
channel_info = x.split("::") | |
if len(channel_info) == 2: | |
channel_dict[channel_info[0]] = channel_info[1] | |
return channel_dict | |
def get_channel_list(): | |
global channel_list | |
if channel_list is None: | |
channel_list = [] | |
for k, v in get_channel_dict().items(): | |
channel_list.append(f"{k}::{v}") | |
return channel_list | |
class ManagerFuncs: | |
def __init__(self): | |
pass | |
def get_current_preview_method(self): | |
return "none" | |
def run_script(self, cmd, cwd='.'): | |
if len(cmd) > 0 and cmd[0].startswith("#"): | |
print(f"[ComfyUI-Manager] Unexpected behavior: `{cmd}`") | |
return 0 | |
new_env = os.environ.copy() | |
new_env["COMFYUI_PATH"] = comfy_path | |
subprocess.check_call(cmd, cwd=cwd, env=new_env) | |
return 0 | |
manager_funcs = ManagerFuncs() | |
def write_config(): | |
config = configparser.ConfigParser() | |
config['default'] = { | |
'preview_method': manager_funcs.get_current_preview_method(), | |
'badge_mode': get_config()['badge_mode'], | |
'git_exe': get_config()['git_exe'], | |
'channel_url': get_config()['channel_url'], | |
'share_option': get_config()['share_option'], | |
'bypass_ssl': get_config()['bypass_ssl'], | |
"file_logging": get_config()['file_logging'], | |
'default_ui': get_config()['default_ui'], | |
'component_policy': get_config()['component_policy'], | |
'double_click_policy': get_config()['double_click_policy'], | |
'windows_selector_event_loop_policy': get_config()['windows_selector_event_loop_policy'], | |
'model_download_by_agent': get_config()['model_download_by_agent'], | |
'downgrade_blacklist': get_config()['downgrade_blacklist'], | |
'security_level': get_config()['security_level'], | |
} | |
with open(config_path, 'w') as configfile: | |
config.write(configfile) | |
def read_config(): | |
try: | |
config = configparser.ConfigParser() | |
config.read(config_path) | |
default_conf = config['default'] | |
# policy migration: disable_unsecure_features -> security_level | |
if 'disable_unsecure_features' in default_conf: | |
if default_conf['disable_unsecure_features'].lower() == 'true': | |
security_level = 'strong' | |
else: | |
security_level = 'normal' | |
else: | |
security_level = default_conf['security_level'] if 'security_level' in default_conf else 'normal' | |
return { | |
'preview_method': default_conf['preview_method'] if 'preview_method' in default_conf else manager_funcs.get_current_preview_method(), | |
'badge_mode': default_conf['badge_mode'] if 'badge_mode' in default_conf else 'none', | |
'git_exe': default_conf['git_exe'] if 'git_exe' in default_conf else '', | |
'channel_url': default_conf['channel_url'] if 'channel_url' in default_conf else 'https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main', | |
'share_option': default_conf['share_option'] if 'share_option' in default_conf else 'all', | |
'bypass_ssl': default_conf['bypass_ssl'].lower() == 'true' if 'bypass_ssl' in default_conf else False, | |
'file_logging': default_conf['file_logging'].lower() == 'true' if 'file_logging' in default_conf else True, | |
'default_ui': default_conf['default_ui'] if 'default_ui' in default_conf else 'none', | |
'component_policy': default_conf['component_policy'] if 'component_policy' in default_conf else 'workflow', | |
'double_click_policy': default_conf['double_click_policy'] if 'double_click_policy' in default_conf else 'copy-all', | |
'windows_selector_event_loop_policy': default_conf['windows_selector_event_loop_policy'].lower() == 'true' if 'windows_selector_event_loop_policy' in default_conf else False, | |
'model_download_by_agent': default_conf['model_download_by_agent'].lower() == 'true' if 'model_download_by_agent' in default_conf else False, | |
'downgrade_blacklist': default_conf['downgrade_blacklist'] if 'downgrade_blacklist' in default_conf else '', | |
'security_level': security_level | |
} | |
except Exception: | |
return { | |
'preview_method': manager_funcs.get_current_preview_method(), | |
'badge_mode': 'none', | |
'git_exe': '', | |
'channel_url': 'https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main', | |
'share_option': 'all', | |
'bypass_ssl': False, | |
'file_logging': True, | |
'default_ui': 'none', | |
'component_policy': 'workflow', | |
'double_click_policy': 'copy-all', | |
'windows_selector_event_loop_policy': False, | |
'model_download_by_agent': False, | |
'downgrade_blacklist': '', | |
'security_level': 'normal', | |
} | |
def get_config(): | |
global cached_config | |
if cached_config is None: | |
cached_config = read_config() | |
return cached_config | |
def switch_to_default_branch(repo): | |
show_result = repo.git.remote("show", "origin") | |
matches = re.search(r"\s*HEAD branch:\s*(.*)", show_result) | |
if matches: | |
default_branch = matches.group(1) | |
repo.git.checkout(default_branch) | |
def try_install_script(url, repo_path, install_cmd, instant_execution=False): | |
if not instant_execution and ((len(install_cmd) > 0 and install_cmd[0].startswith('#')) or (platform.system() == "Windows" and comfy_ui_commit_datetime.date() >= comfy_ui_required_commit_datetime.date())): | |
if not os.path.exists(startup_script_path): | |
os.makedirs(startup_script_path) | |
script_path = os.path.join(startup_script_path, "install-scripts.txt") | |
with open(script_path, "a") as file: | |
obj = [repo_path] + install_cmd | |
file.write(f"{obj}\n") | |
return True | |
else: | |
if len(install_cmd) == 5 and install_cmd[2:4] == ['pip', 'install']: | |
if is_blacklisted(install_cmd[4]): | |
print(f"[ComfyUI-Manager] skip black listed pip installation: '{install_cmd[4]}'") | |
return True | |
print(f"\n## ComfyUI-Manager: EXECUTE => {install_cmd}") | |
code = manager_funcs.run_script(install_cmd, cwd=repo_path) | |
if platform.system() != "Windows": | |
try: | |
if comfy_ui_commit_datetime.date() < comfy_ui_required_commit_datetime.date(): | |
print("\n\n###################################################################") | |
print(f"[WARN] ComfyUI-Manager: Your ComfyUI version ({comfy_ui_revision})[{comfy_ui_commit_datetime.date()}] is too old. Please update to the latest version.") | |
print(f"[WARN] The extension installation feature may not work properly in the current installed ComfyUI version on Windows environment.") | |
print("###################################################################\n\n") | |
except: | |
pass | |
if code != 0: | |
if url is None: | |
url = os.path.dirname(repo_path) | |
print(f"install script failed: {url}") | |
return False | |
# use subprocess to avoid file system lock by git (Windows) | |
def __win_check_git_update(path, do_fetch=False, do_update=False): | |
if do_fetch: | |
command = [sys.executable, git_script_path, "--fetch", path] | |
elif do_update: | |
command = [sys.executable, git_script_path, "--pull", path] | |
else: | |
command = [sys.executable, git_script_path, "--check", path] | |
new_env = os.environ.copy() | |
new_env["COMFYUI_PATH"] = comfy_path | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=get_default_custom_nodes_path()) | |
output, _ = process.communicate() | |
output = output.decode('utf-8').strip() | |
if 'detected dubious' in output: | |
# fix and try again | |
safedir_path = path.replace('\\', '/') | |
try: | |
print(f"[ComfyUI-Manager] Try fixing 'dubious repository' error on '{safedir_path}' repo") | |
process = subprocess.Popen(['git', 'config', '--global', '--add', 'safe.directory', safedir_path], env=new_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
output, _ = process.communicate() | |
process = subprocess.Popen(command, env=new_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
output, _ = process.communicate() | |
output = output.decode('utf-8').strip() | |
except Exception: | |
print(f'[ComfyUI-Manager] failed to fixing') | |
if 'detected dubious' in output: | |
print(f'\n[ComfyUI-Manager] Failed to fixing repository setup. Please execute this command on cmd: \n' | |
f'-----------------------------------------------------------------------------------------\n' | |
f'git config --global --add safe.directory "{safedir_path}"\n' | |
f'-----------------------------------------------------------------------------------------\n') | |
if do_update: | |
if "CUSTOM NODE PULL: Success" in output: | |
process.wait() | |
print(f"\x1b[2K\rUpdated: {path}") | |
return True, True # updated | |
elif "CUSTOM NODE PULL: None" in output: | |
process.wait() | |
return False, True # there is no update | |
else: | |
print(f"\x1b[2K\rUpdate error: {path}") | |
process.wait() | |
return False, False # update failed | |
else: | |
if "CUSTOM NODE CHECK: True" in output: | |
process.wait() | |
return True, True | |
elif "CUSTOM NODE CHECK: False" in output: | |
process.wait() | |
return False, True | |
else: | |
print(f"\x1b[2K\rFetch error: {path}") | |
print(f"\n{output}\n") | |
process.wait() | |
return False, True | |
def __win_check_git_pull(path): | |
new_env = os.environ.copy() | |
new_env["COMFYUI_PATH"] = comfy_path | |
command = [sys.executable, git_script_path, "--pull", path] | |
process = subprocess.Popen(command, env=new_env, cwd=get_default_custom_nodes_path()) | |
process.wait() | |
def execute_install_script(url, repo_path, lazy_mode=False, instant_execution=False): | |
install_script_path = os.path.join(repo_path, "install.py") | |
requirements_path = os.path.join(repo_path, "requirements.txt") | |
if lazy_mode: | |
install_cmd = ["#LAZY-INSTALL-SCRIPT", sys.executable] | |
try_install_script(url, repo_path, install_cmd) | |
else: | |
if os.path.exists(requirements_path): | |
print("Install: pip packages") | |
pip_fixer = PIPFixer(get_installed_packages()) | |
with open(requirements_path, "r") as requirements_file: | |
for line in requirements_file: | |
#handle comments | |
if '#' in line: | |
if line.strip()[0] == '#': | |
print("Line is comment...skipping") | |
continue | |
else: | |
line = line.split('#')[0].strip() | |
package_name = remap_pip_package(line.strip()) | |
if package_name and not package_name.startswith('#'): | |
if '--index-url' in package_name: | |
s = package_name.split('--index-url') | |
install_cmd = [sys.executable, "-m", "pip", "install", s[0].strip(), '--index-url', s[1].strip()] | |
else: | |
install_cmd = [sys.executable, "-m", "pip", "install", package_name] | |
if package_name.strip() != "" and not package_name.startswith('#'): | |
try_install_script(url, repo_path, install_cmd, instant_execution=instant_execution) | |
pip_fixer.fix_broken() | |
if os.path.exists(install_script_path): | |
print(f"Install: install script") | |
install_cmd = [sys.executable, "install.py"] | |
try_install_script(url, repo_path, install_cmd, instant_execution=instant_execution) | |
return True | |
def git_repo_has_updates(path, do_fetch=False, do_update=False): | |
if do_fetch: | |
print(f"\x1b[2K\rFetching: {path}", end='') | |
elif do_update: | |
print(f"\x1b[2K\rUpdating: {path}", end='') | |
# Check if the path is a git repository | |
if not os.path.exists(os.path.join(path, '.git')): | |
raise ValueError('Not a git repository') | |
if platform.system() == "Windows": | |
updated, success = __win_check_git_update(path, do_fetch, do_update) | |
if updated and success: | |
execute_install_script(None, path, lazy_mode=True) | |
return updated, success | |
else: | |
# Fetch the latest commits from the remote repository | |
repo = git.Repo(path) | |
current_branch = repo.active_branch | |
branch_name = current_branch.name | |
remote_name = 'origin' | |
remote = repo.remote(name=remote_name) | |
# Get the current commit hash | |
commit_hash = repo.head.commit.hexsha | |
if do_fetch or do_update: | |
remote.fetch() | |
if do_update: | |
if repo.head.is_detached: | |
switch_to_default_branch(repo) | |
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha | |
if commit_hash == remote_commit_hash: | |
repo.close() | |
return False, True | |
try: | |
remote.pull() | |
repo.git.submodule('update', '--init', '--recursive') | |
new_commit_hash = repo.head.commit.hexsha | |
if commit_hash != new_commit_hash: | |
execute_install_script(None, path) | |
print(f"\x1b[2K\rUpdated: {path}") | |
return True, True | |
else: | |
return False, False | |
except Exception as e: | |
print(f"\nUpdating failed: {path}\n{e}", file=sys.stderr) | |
return False, False | |
if repo.head.is_detached: | |
repo.close() | |
return True, True | |
# Get commit hash of the remote branch | |
current_branch = repo.active_branch | |
branch_name = current_branch.name | |
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha | |
# Compare the commit hashes to determine if the local repository is behind the remote repository | |
if commit_hash != remote_commit_hash: | |
# Get the commit dates | |
commit_date = repo.head.commit.committed_datetime | |
remote_commit_date = repo.refs[f'{remote_name}/{branch_name}'].object.committed_datetime | |
# Compare the commit dates to determine if the local repository is behind the remote repository | |
if commit_date < remote_commit_date: | |
repo.close() | |
return True, True | |
repo.close() | |
return False, True | |
class GitProgress(RemoteProgress): | |
def __init__(self): | |
super().__init__() | |
self.pbar = tqdm() | |
def update(self, op_code, cur_count, max_count=None, message=''): | |
self.pbar.total = max_count | |
self.pbar.n = cur_count | |
self.pbar.pos = 0 | |
self.pbar.refresh() | |
def is_valid_url(url): | |
try: | |
# Check for HTTP/HTTPS URL format | |
result = urlparse(url) | |
if all([result.scheme, result.netloc]): | |
return True | |
finally: | |
# Check for SSH git URL format | |
pattern = re.compile(r"^(.+@|ssh:\/\/).+:.+$") | |
if pattern.match(url): | |
return True | |
return False | |
def gitclone_install(files, instant_execution=False, msg_prefix=''): | |
print(f"{msg_prefix}Install: {files}") | |
for url in files: | |
if not is_valid_url(url): | |
print(f"Invalid git url: '{url}'") | |
return False | |
if url.endswith("/"): | |
url = url[:-1] | |
try: | |
print(f"Download: git clone '{url}'") | |
repo_name = os.path.splitext(os.path.basename(url))[0] | |
repo_path = os.path.join(get_default_custom_nodes_path(), repo_name) | |
# Clone the repository from the remote URL | |
if not instant_execution and platform.system() == 'Windows': | |
res = manager_funcs.run_script([sys.executable, git_script_path, "--clone", get_default_custom_nodes_path(), url], cwd=get_default_custom_nodes_path()) | |
if res != 0: | |
return False | |
else: | |
repo = git.Repo.clone_from(url, repo_path, recursive=True, progress=GitProgress()) | |
repo.git.clear_cache() | |
repo.close() | |
if not execute_install_script(url, repo_path, instant_execution=instant_execution): | |
return False | |
except Exception as e: | |
print(f"Install(git-clone) error: {url} / {e}", file=sys.stderr) | |
return False | |
print("Installation was successful.") | |
return True | |
def git_pull(path): | |
# Check if the path is a git repository | |
if not os.path.exists(os.path.join(path, '.git')): | |
raise ValueError('Not a git repository') | |
# Pull the latest changes from the remote repository | |
if platform.system() == "Windows": | |
return __win_check_git_pull(path) | |
else: | |
repo = git.Repo(path) | |
if repo.is_dirty(): | |
repo.git.stash() | |
if repo.head.is_detached: | |
switch_to_default_branch(repo) | |
current_branch = repo.active_branch | |
remote_name = current_branch.tracking_branch().remote_name | |
remote = repo.remote(name=remote_name) | |
remote.pull() | |
repo.git.submodule('update', '--init', '--recursive') | |
repo.close() | |
return True | |
async def get_data(uri, silent=False): | |
if not silent: | |
print(f"FETCH DATA from: {uri}", end="") | |
if uri.startswith("http"): | |
async with aiohttp.ClientSession(trust_env=True, connector=aiohttp.TCPConnector(verify_ssl=False)) as session: | |
async with session.get(uri) as resp: | |
json_text = await resp.text() | |
else: | |
with cache_lock: | |
with open(uri, "r", encoding="utf-8") as f: | |
json_text = f.read() | |
json_obj = json.loads(json_text) | |
if not silent: | |
print(f" [DONE]") | |
return json_obj | |
def simple_hash(input_string): | |
hash_value = 0 | |
for char in input_string: | |
hash_value = (hash_value * 31 + ord(char)) % (2**32) | |
return hash_value | |
def is_file_created_within_one_day(file_path): | |
if not os.path.exists(file_path): | |
return False | |
file_creation_time = os.path.getctime(file_path) | |
current_time = datetime.now().timestamp() | |
time_difference = current_time - file_creation_time | |
return time_difference <= 86400 | |
async def get_data_by_mode(mode, filename, channel_url=None): | |
if channel_url in get_channel_dict(): | |
channel_url = get_channel_dict()[channel_url] | |
try: | |
if mode == "local": | |
uri = os.path.join(comfyui_manager_path, filename) | |
json_obj = await get_data(uri) | |
else: | |
if channel_url is None: | |
uri = get_config()['channel_url'] + '/' + filename | |
else: | |
uri = channel_url + '/' + filename | |
cache_uri = str(simple_hash(uri))+'_'+filename | |
cache_uri = os.path.join(cache_dir, cache_uri) | |
if mode == "cache": | |
if is_file_created_within_one_day(cache_uri): | |
json_obj = await get_data(cache_uri) | |
else: | |
json_obj = await get_data(uri) | |
with cache_lock: | |
with open(cache_uri, "w", encoding='utf-8') as file: | |
json.dump(json_obj, file, indent=4, sort_keys=True) | |
else: | |
json_obj = await get_data(uri) | |
with cache_lock: | |
with open(cache_uri, "w", encoding='utf-8') as file: | |
json.dump(json_obj, file, indent=4, sort_keys=True) | |
except Exception as e: | |
print(f"[ComfyUI-Manager] Due to a network error, switching to local mode.\n=> {filename}\n=> {e}") | |
uri = os.path.join(comfyui_manager_path, filename) | |
json_obj = await get_data(uri) | |
return json_obj | |
def lookup_installed_custom_nodes(repo_name): | |
try: | |
import folder_paths | |
base_paths = folder_paths.get_folder_paths("custom_nodes") | |
except: | |
base_paths = [custom_nodes_path] | |
for base_path in base_paths: | |
repo_path = os.path.join(base_path, repo_name) | |
if os.path.exists(repo_path): | |
return True, repo_path | |
elif os.path.exists(repo_path+'.disabled'): | |
return False, repo_path | |
return None | |
def gitclone_fix(files, instant_execution=False): | |
print(f"Try fixing: {files}") | |
for url in files: | |
if not is_valid_url(url): | |
print(f"Invalid git url: '{url}'") | |
return False | |
if url.endswith("/"): | |
url = url[:-1] | |
try: | |
repo_name = os.path.splitext(os.path.basename(url))[0] | |
repo_path = lookup_installed_custom_nodes(repo_name) | |
if repo_path is not None: | |
repo_path = repo_path[1] | |
if not execute_install_script(url, repo_path, instant_execution=instant_execution): | |
return False | |
else: | |
print(f"Custom node not found: {repo_name}") | |
except Exception as e: | |
print(f"Install(git-clone) error: {url} / {e}", file=sys.stderr) | |
return False | |
print(f"Attempt to fixing '{files}' is done.") | |
return True | |
def pip_install(packages): | |
install_cmd = ['#FORCE', sys.executable, "-m", "pip", "install", '-U'] + packages | |
try_install_script('pip install via manager', '..', install_cmd) | |
def rmtree(path): | |
retry_count = 3 | |
while True: | |
try: | |
retry_count -= 1 | |
if platform.system() == "Windows": | |
manager_funcs.run_script(['attrib', '-R', path + '\\*', '/S']) | |
shutil.rmtree(path) | |
return True | |
except Exception as ex: | |
print(f"ex: {ex}") | |
time.sleep(3) | |
if retry_count < 0: | |
raise ex | |
print(f"Uninstall retry({retry_count})") | |
def gitclone_uninstall(files): | |
import os | |
print(f"Uninstall: {files}") | |
for url in files: | |
if url.endswith("/"): | |
url = url[:-1] | |
try: | |
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(dir_name) | |
if repo_path is None: | |
continue | |
dir_path = repo_path[1] | |
install_script_path = os.path.join(dir_path, "uninstall.py") | |
disable_script_path = os.path.join(dir_path, "disable.py") | |
if os.path.exists(install_script_path): | |
uninstall_cmd = [sys.executable, "uninstall.py"] | |
code = manager_funcs.run_script(uninstall_cmd, cwd=dir_path) | |
if code != 0: | |
print(f"An error occurred during the execution of the uninstall.py script. Only the '{dir_path}' will be deleted.") | |
elif os.path.exists(disable_script_path): | |
disable_script = [sys.executable, "disable.py"] | |
code = manager_funcs.run_script(disable_script, cwd=dir_path) | |
if code != 0: | |
print(f"An error occurred during the execution of the disable.py script. Only the '{dir_path}' will be deleted.") | |
rmtree(dir_path) | |
except Exception as e: | |
print(f"Uninstall(git-clone) error: {url} / {e}", file=sys.stderr) | |
return False | |
print("Uninstallation was successful.") | |
return True | |
def gitclone_set_active(files, is_disable): | |
import os | |
if is_disable: | |
action_name = "Disable" | |
else: | |
action_name = "Enable" | |
print(f"{action_name}: {files}") | |
for url in files: | |
if url.endswith("/"): | |
url = url[:-1] | |
try: | |
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(dir_name) | |
if repo_path is None: | |
continue | |
dir_path = repo_path[1] | |
if is_disable: | |
current_path = dir_path | |
new_path = dir_path + ".disabled" | |
else: | |
current_path = dir_path + ".disabled" | |
new_path = dir_path | |
os.rename(current_path, new_path) | |
if is_disable: | |
if os.path.exists(os.path.join(new_path, "disable.py")): | |
disable_script = [sys.executable, "disable.py"] | |
try_install_script(url, new_path, disable_script) | |
else: | |
if os.path.exists(os.path.join(new_path, "enable.py")): | |
enable_script = [sys.executable, "enable.py"] | |
try_install_script(url, new_path, enable_script) | |
except Exception as e: | |
print(f"{action_name}(git-clone) error: {url} / {e}", file=sys.stderr) | |
return False | |
print(f"{action_name} was successful.") | |
return True | |
def gitclone_update(files, instant_execution=False, skip_script=False, msg_prefix=""): | |
import os | |
print(f"{msg_prefix}Update: {files}") | |
for url in files: | |
if url.endswith("/"): | |
url = url[:-1] | |
try: | |
repo_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(repo_name) | |
if repo_path is None: | |
continue | |
repo_path = repo_path[1] | |
git_pull(repo_path) | |
if not skip_script: | |
if instant_execution: | |
if not execute_install_script(url, repo_path, lazy_mode=False, instant_execution=True): | |
return False | |
else: | |
if not execute_install_script(url, repo_path, lazy_mode=True): | |
return False | |
except Exception as e: | |
print(f"Update(git-clone) error: {url} / {e}", file=sys.stderr) | |
return False | |
if not skip_script: | |
print("Update was successful.") | |
return True | |
def update_path(repo_path, instant_execution=False): | |
if not os.path.exists(os.path.join(repo_path, '.git')): | |
return "fail" | |
# version check | |
repo = git.Repo(repo_path) | |
if repo.head.is_detached: | |
switch_to_default_branch(repo) | |
current_branch = repo.active_branch | |
branch_name = current_branch.name | |
if current_branch.tracking_branch() is None: | |
print(f"[ComfyUI-Manager] There is no tracking branch ({current_branch})") | |
remote_name = 'origin' | |
else: | |
remote_name = current_branch.tracking_branch().remote_name | |
remote = repo.remote(name=remote_name) | |
try: | |
remote.fetch() | |
except Exception as e: | |
if 'detected dubious' in str(e): | |
print(f"[ComfyUI-Manager] Try fixing 'dubious repository' error on 'ComfyUI' repository") | |
safedir_path = comfy_path.replace('\\', '/') | |
subprocess.run(['git', 'config', '--global', '--add', 'safe.directory', safedir_path]) | |
try: | |
remote.fetch() | |
except Exception: | |
print(f"\n[ComfyUI-Manager] Failed to fixing repository setup. Please execute this command on cmd: \n" | |
f"-----------------------------------------------------------------------------------------\n" | |
f'git config --global --add safe.directory "{safedir_path}"\n' | |
f"-----------------------------------------------------------------------------------------\n") | |
commit_hash = repo.head.commit.hexsha | |
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha | |
if commit_hash != remote_commit_hash: | |
git_pull(repo_path) | |
execute_install_script("ComfyUI", repo_path, instant_execution=instant_execution) | |
return "updated" | |
else: | |
return "skipped" | |
def lookup_customnode_by_url(data, target): | |
for x in data['custom_nodes']: | |
if target in x['files']: | |
dir_name = os.path.splitext(os.path.basename(target))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(dir_name) | |
if repo_path is None: | |
continue | |
if repo_path[0]: | |
x['installed'] = 'True' | |
else: | |
x['installed'] = 'Disabled' | |
return x | |
return None | |
def simple_check_custom_node(url): | |
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(dir_name) | |
if repo_path is None: | |
return 'not-installed' | |
if repo_path[0]: | |
return 'installed' | |
else: | |
return 'disabled' | |
def check_a_custom_node_installed(item, do_fetch=False, do_update_check=True, do_update=False): | |
item['installed'] = 'None' | |
if item['install_type'] == 'git-clone' and len(item['files']) == 1: | |
url = item['files'][0] | |
if url.endswith("/"): | |
url = url[:-1] | |
dir_name = os.path.splitext(os.path.basename(url))[0].replace(".git", "") | |
repo_path = lookup_installed_custom_nodes(dir_name) | |
if repo_path is None: | |
item['installed'] = 'False' | |
elif repo_path[0]: | |
dir_path = repo_path[1] | |
try: | |
item['installed'] = 'True' # default | |
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name): | |
item['installed'] = 'Fail' | |
if do_update_check: | |
update_state, success = git_repo_has_updates(dir_path, do_fetch, do_update) | |
if (do_update_check or do_update) and update_state: | |
item['installed'] = 'Update' | |
elif do_update and not success: | |
item['installed'] = 'Fail' | |
except: | |
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name): | |
item['installed'] = 'Fail' | |
else: | |
item['installed'] = 'True' | |
else: | |
item['installed'] = 'Disabled' | |
elif item['install_type'] == 'copy' and len(item['files']) == 1: | |
dir_name = os.path.basename(item['files'][0]) | |
if item['files'][0].endswith('.py'): | |
base_path = lookup_installed_custom_nodes(item['files'][0]) | |
if base_path is None: | |
item['installed'] = 'False' | |
return | |
elif base_path[0]: | |
item['installed'] = 'True' | |
else: | |
item['installed'] = 'Disabled' | |
return | |
elif 'js_path' in item: | |
base_path = os.path.join(js_path, item['js_path']) | |
else: | |
base_path = js_path | |
file_path = os.path.join(base_path, dir_name) | |
if os.path.exists(file_path): | |
if cm_global.try_call(api="cm.is_import_failed_extension", name=dir_name): | |
item['installed'] = 'Fail' | |
else: | |
item['installed'] = 'True' | |
else: | |
item['installed'] = 'False' | |
def get_installed_pip_packages(): | |
# extract pip package infos | |
pips = subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'], text=True).split('\n') | |
res = {} | |
for x in pips: | |
if x.strip() == "": | |
continue | |
if ' @ ' in x: | |
spec_url = x.split(' @ ') | |
res[spec_url[0]] = spec_url[1] | |
else: | |
res[x] = "" | |
return res | |
def get_current_snapshot(): | |
# Get ComfyUI hash | |
repo_path = comfy_path | |
if not os.path.exists(os.path.join(repo_path, '.git')): | |
print(f"ComfyUI update fail: The installed ComfyUI does not have a Git repository.") | |
return {} | |
repo = git.Repo(repo_path) | |
comfyui_commit_hash = repo.head.commit.hexsha | |
git_custom_nodes = {} | |
file_custom_nodes = [] | |
try: | |
import folder_paths | |
base_paths = folder_paths.get_folder_paths("custom_nodes") | |
except: | |
base_paths = [custom_nodes_path] | |
# Get custom nodes hash | |
for base_path in base_paths: | |
for path in os.listdir(base_path): | |
fullpath = os.path.join(base_path, path) | |
if os.path.isdir(fullpath): | |
is_disabled = path.endswith(".disabled") | |
try: | |
git_dir = os.path.join(fullpath, '.git') | |
if not os.path.exists(git_dir): | |
continue | |
repo = git.Repo(fullpath) | |
commit_hash = repo.head.commit.hexsha | |
url = repo.remotes.origin.url | |
git_custom_nodes[url] = { | |
'hash': commit_hash, | |
'disabled': is_disabled | |
} | |
except: | |
print(f"Failed to extract snapshots for the custom node '{path}'.") | |
elif path.endswith('.py'): | |
is_disabled = path.endswith(".py.disabled") | |
filename = os.path.basename(path) | |
item = { | |
'filename': filename, | |
'disabled': is_disabled | |
} | |
file_custom_nodes.append(item) | |
pip_packages = get_installed_pip_packages() | |
return { | |
'comfyui': comfyui_commit_hash, | |
'git_custom_nodes': git_custom_nodes, | |
'file_custom_nodes': file_custom_nodes, | |
'pips': pip_packages, | |
} | |
def save_snapshot_with_postfix(postfix, path=None): | |
if path is None: | |
now = datetime.now() | |
date_time_format = now.strftime("%Y-%m-%d_%H-%M-%S") | |
file_name = f"{date_time_format}_{postfix}" | |
path = os.path.join(comfyui_manager_path, 'snapshots', f"{file_name}.json") | |
else: | |
file_name = path.replace('\\', '/').split('/')[-1] | |
file_name = file_name.split('.')[-2] | |
snapshot = get_current_snapshot() | |
if path.endswith('.json'): | |
with open(path, "w") as json_file: | |
json.dump(snapshot, json_file, indent=4) | |
return file_name + '.json' | |
elif path.endswith('.yaml'): | |
with open(path, "w") as yaml_file: | |
snapshot = {'custom_nodes': snapshot} | |
yaml.dump(snapshot, yaml_file, allow_unicode=True) | |
return path | |
async def extract_nodes_from_workflow(filepath, mode='local', channel_url='default'): | |
# prepare json data | |
workflow = None | |
if filepath.endswith('.json'): | |
with open(filepath, "r", encoding="UTF-8", errors="ignore") as json_file: | |
try: | |
workflow = json.load(json_file) | |
except: | |
print(f"Invalid workflow file: {filepath}") | |
exit(-1) | |
elif filepath.endswith('.png'): | |
from PIL import Image | |
with Image.open(filepath) as img: | |
if 'workflow' not in img.info: | |
print(f"The specified .png file doesn't have a workflow: {filepath}") | |
exit(-1) | |
else: | |
try: | |
workflow = json.loads(img.info['workflow']) | |
except: | |
print(f"This is not a valid .png file containing a ComfyUI workflow: {filepath}") | |
exit(-1) | |
if workflow is None: | |
print(f"Invalid workflow file: {filepath}") | |
exit(-1) | |
# extract nodes | |
used_nodes = set() | |
def extract_nodes(sub_workflow): | |
for x in sub_workflow['nodes']: | |
node_name = x.get('type') | |
# skip virtual nodes | |
if node_name in ['Reroute', 'Note']: | |
continue | |
if node_name is not None and not (node_name.startswith('workflow/') or node_name.startswith('workflow>')): | |
used_nodes.add(node_name) | |
if 'nodes' in workflow: | |
extract_nodes(workflow) | |
if 'extra' in workflow: | |
if 'groupNodes' in workflow['extra']: | |
for x in workflow['extra']['groupNodes'].values(): | |
extract_nodes(x) | |
# lookup dependent custom nodes | |
ext_map = await get_data_by_mode(mode, 'extension-node-map.json', channel_url) | |
rext_map = {} | |
preemption_map = {} | |
patterns = [] | |
for k, v in ext_map.items(): | |
if k == 'https://github.com/comfyanonymous/ComfyUI': | |
for x in v[0]: | |
if x not in preemption_map: | |
preemption_map[x] = [] | |
preemption_map[x] = k | |
continue | |
for x in v[0]: | |
if x not in rext_map: | |
rext_map[x] = [] | |
rext_map[x].append(k) | |
if 'preemptions' in v[1]: | |
for x in v[1]['preemptions']: | |
if x not in preemption_map: | |
preemption_map[x] = [] | |
preemption_map[x] = k | |
if 'nodename_pattern' in v[1]: | |
patterns.append((v[1]['nodename_pattern'], k)) | |
# identify used extensions | |
used_exts = set() | |
unknown_nodes = set() | |
for node_name in used_nodes: | |
ext = preemption_map.get(node_name) | |
if ext is None: | |
ext = rext_map.get(node_name) | |
if ext is not None: | |
ext = ext[0] | |
if ext is None: | |
for pat_ext in patterns: | |
if re.search(pat_ext[0], node_name): | |
ext = pat_ext[1] | |
break | |
if ext == 'https://github.com/comfyanonymous/ComfyUI': | |
pass | |
elif ext is not None: | |
if 'Fooocus' in ext: | |
print(f">> {node_name}") | |
used_exts.add(ext) | |
else: | |
unknown_nodes.add(node_name) | |
return used_exts, unknown_nodes | |
def unzip(model_path): | |
if not os.path.exists(model_path): | |
print(f"[ComfyUI-Manager] unzip: File not found: {model_path}") | |
return False | |
base_dir = os.path.dirname(model_path) | |
filename = os.path.basename(model_path) | |
target_dir = os.path.join(base_dir, filename[:-4]) | |
os.makedirs(target_dir, exist_ok=True) | |
with zipfile.ZipFile(model_path, 'r') as zip_ref: | |
zip_ref.extractall(target_dir) | |
# Check if there's only one directory inside the target directory | |
contents = os.listdir(target_dir) | |
if len(contents) == 1 and os.path.isdir(os.path.join(target_dir, contents[0])): | |
nested_dir = os.path.join(target_dir, contents[0]) | |
# Move each file and sub-directory in the nested directory up to the target directory | |
for item in os.listdir(nested_dir): | |
shutil.move(os.path.join(nested_dir, item), os.path.join(target_dir, item)) | |
# Remove the now empty nested directory | |
os.rmdir(nested_dir) | |
os.remove(model_path) | |
return True | |