import os import sys import traceback import json import asyncio import subprocess import shutil import concurrent import threading from typing import Optional import typer from rich import print from typing_extensions import List, Annotated import re import git sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.join(os.path.dirname(__file__), "glob")) import manager_core as core import cm_global comfyui_manager_path = os.path.dirname(__file__) comfy_path = os.environ.get('COMFYUI_PATH') if comfy_path is None: print(f"\n[bold yellow]WARN: The `COMFYUI_PATH` environment variable is not set. Assuming `custom_nodes/ComfyUI-Manager/../../` as the ComfyUI path.[/bold yellow]", file=sys.stderr) comfy_path = os.path.abspath(os.path.join(comfyui_manager_path, '..', '..')) startup_script_path = os.path.join(comfyui_manager_path, "startup-scripts") custom_nodes_path = os.path.join(comfy_path, 'custom_nodes') script_path = os.path.join(startup_script_path, "install-scripts.txt") restore_snapshot_path = os.path.join(startup_script_path, "restore-snapshot.json") pip_overrides_path = os.path.join(comfyui_manager_path, "pip_overrides.json") git_script_path = os.path.join(comfyui_manager_path, "git_helper.py") cm_global.pip_blacklist = ['torch', 'torchsde', 'torchvision'] cm_global.pip_downgrade_blacklist = ['torch', 'torchsde', 'torchvision', 'transformers', 'safetensors', 'kornia'] cm_global.pip_overrides = {} if os.path.exists(pip_overrides_path): with open(pip_overrides_path, 'r', encoding="UTF-8", errors="ignore") as json_file: cm_global.pip_overrides = json.load(json_file) cm_global.pip_overrides['numpy'] = 'numpy<2' def check_comfyui_hash(): repo = git.Repo(comfy_path) core.comfy_ui_revision = len(list(repo.iter_commits('HEAD'))) comfy_ui_hash = repo.head.commit.hexsha cm_global.variables['comfyui.revision'] = core.comfy_ui_revision core.comfy_ui_commit_datetime = repo.head.commit.committed_datetime check_comfyui_hash() # This is a preparation step for manager_core def read_downgrade_blacklist(): try: import configparser config_path = os.path.join(os.path.dirname(__file__), "config.ini") config = configparser.ConfigParser() config.read(config_path) default_conf = config['default'] if 'downgrade_blacklist' in default_conf: items = default_conf['downgrade_blacklist'].split(',') items = [x.strip() for x in items if x != ''] cm_global.pip_downgrade_blacklist += items cm_global.pip_downgrade_blacklist = list(set(cm_global.pip_downgrade_blacklist)) except: pass read_downgrade_blacklist() # This is a preparation step for manager_core class Ctx: def __init__(self): self.channel = 'default' self.mode = 'remote' self.processed_install = set() self.custom_node_map_cache = None def set_channel_mode(self, channel, mode): if mode is not None: self.mode = mode valid_modes = ["remote", "local", "cache"] if mode and mode.lower() not in valid_modes: typer.echo( f"Invalid mode: {mode}. Allowed modes are 'remote', 'local', 'cache'.", err=True, ) exit(1) if channel is not None: self.channel = channel def post_install(self, url): try: repository_name = url.split("/")[-1].strip() repo_path = os.path.join(custom_nodes_path, repository_name) repo_path = os.path.abspath(repo_path) requirements_path = os.path.join(repo_path, 'requirements.txt') install_script_path = os.path.join(repo_path, 'install.py') if os.path.exists(requirements_path): with open(requirements_path, 'r', encoding="UTF-8", errors="ignore") as file: for line in file: package_name = core.remap_pip_package(line.strip()) if package_name and not core.is_installed(package_name): install_cmd = [sys.executable, "-m", "pip", "install", package_name] output = subprocess.check_output(install_cmd, cwd=repo_path, text=True) for msg_line in output.split('\n'): if 'Requirement already satisfied:' in msg_line: print('.', end='') else: print(msg_line) if os.path.exists(install_script_path) and f'{repo_path}/install.py' not in self.processed_install: self.processed_install.add(f'{repo_path}/install.py') install_cmd = [sys.executable, install_script_path] output = subprocess.check_output(install_cmd, cwd=repo_path, text=True) for msg_line in output.split('\n'): if 'Requirement already satisfied:' in msg_line: print('.', end='') else: print(msg_line) except Exception: print(f"ERROR: Restoring '{url}' is failed.") def restore_dependencies(self): node_paths = [os.path.join(custom_nodes_path, name) for name in os.listdir(custom_nodes_path) if os.path.isdir(os.path.join(custom_nodes_path, name)) and not name.endswith('.disabled')] total = len(node_paths) i = 1 for x in node_paths: print(f"----------------------------------------------------------------------------------------------------") print(f"Restoring [{i}/{total}]: {x}") self.post_install(x) i += 1 def load_custom_nodes(self): channel_dict = core.get_channel_dict() if self.channel not in channel_dict: print(f"[bold red]ERROR: Invalid channel is specified `--channel {self.channel}`[/bold red]", file=sys.stderr) exit(1) if self.mode not in ['remote', 'local', 'cache']: print(f"[bold red]ERROR: Invalid mode is specified `--mode {self.mode}`[/bold red]", file=sys.stderr) exit(1) channel_url = channel_dict[self.channel] res = {} json_obj = asyncio.run(core.get_data_by_mode(self.mode, 'custom-node-list.json', channel_url=channel_url)) for x in json_obj['custom_nodes']: for y in x['files']: if 'github.com' in y and not (y.endswith('.py') or y.endswith('.js')): repo_name = y.split('/')[-1] res[repo_name] = (x, False) if 'id' in x: if x['id'] not in res: res[x['id']] = (x, True) return res def get_custom_node_map(self): if self.custom_node_map_cache is not None: return self.custom_node_map_cache self.custom_node_map_cache = self.load_custom_nodes() return self.custom_node_map_cache def lookup_node_path(self, node_name, robust=False): if '..' in node_name: print(f"\n[bold red]ERROR: Invalid node name '{node_name}'[/bold red]\n") exit(2) custom_node_map = self.get_custom_node_map() if node_name in custom_node_map: node_url = custom_node_map[node_name][0]['files'][0] repo_name = node_url.split('/')[-1] node_path = os.path.join(custom_nodes_path, repo_name) return node_path, custom_node_map[node_name][0] elif robust: node_path = os.path.join(custom_nodes_path, node_name) return node_path, None print(f"\n[bold red]ERROR: Invalid node name '{node_name}'[/bold red]\n") exit(2) cm_ctx = Ctx() def install_node(node_name, is_all=False, cnt_msg=''): if core.is_valid_url(node_name): # install via urls res = core.gitclone_install([node_name]) if not res: print(f"[bold red]ERROR: An error occurred while installing '{node_name}'.[/bold red]") else: print(f"{cnt_msg} [INSTALLED] {node_name:50}") else: node_path, node_item = cm_ctx.lookup_node_path(node_name) if os.path.exists(node_path): if not is_all: print(f"{cnt_msg} [ SKIPPED ] {node_name:50} => Already installed") elif os.path.exists(node_path + '.disabled'): enable_node(node_name) else: res = core.gitclone_install(node_item['files'], instant_execution=True, msg_prefix=f"[{cnt_msg}] ") if not res: print(f"[bold red]ERROR: An error occurred while installing '{node_name}'.[/bold red]") else: print(f"{cnt_msg} [INSTALLED] {node_name:50}") def reinstall_node(node_name, is_all=False, cnt_msg=''): node_path, node_item = cm_ctx.lookup_node_path(node_name) if os.path.exists(node_path): shutil.rmtree(node_path) if os.path.exists(node_path + '.disabled'): shutil.rmtree(node_path + '.disabled') install_node(node_name, is_all=is_all, cnt_msg=cnt_msg) def fix_node(node_name, is_all=False, cnt_msg=''): node_path, node_item = cm_ctx.lookup_node_path(node_name, robust=True) files = node_item['files'] if node_item is not None else [node_path] if os.path.exists(node_path): print(f"{cnt_msg} [ FIXING ]: {node_name:50} => Disabled") res = core.gitclone_fix(files, instant_execution=True) if not res: print(f"ERROR: An error occurred while fixing '{node_name}'.") elif not is_all and os.path.exists(node_path + '.disabled'): print(f"{cnt_msg} [ SKIPPED ]: {node_name:50} => Disabled") elif not is_all: print(f"{cnt_msg} [ SKIPPED ]: {node_name:50} => Not installed") def uninstall_node(node_name, is_all=False, cnt_msg=''): node_path, node_item = cm_ctx.lookup_node_path(node_name, robust=True) files = node_item['files'] if node_item is not None else [node_path] if os.path.exists(node_path) or os.path.exists(node_path + '.disabled'): res = core.gitclone_uninstall(files) if not res: print(f"ERROR: An error occurred while uninstalling '{node_name}'.") else: print(f"{cnt_msg} [UNINSTALLED] {node_name:50}") else: print(f"{cnt_msg} [ SKIPPED ]: {node_name:50} => Not installed") def update_node(node_name, is_all=False, cnt_msg=''): node_path, node_item = cm_ctx.lookup_node_path(node_name, robust=True) files = node_item['files'] if node_item is not None else [node_path] res = core.gitclone_update(files, skip_script=True, msg_prefix=f"[{cnt_msg}] ") if not res: print(f"ERROR: An error occurred while updating '{node_name}'.") return None return node_path def update_parallel(nodes): is_all = False if 'all' in nodes: is_all = True nodes = [x for x in cm_ctx.get_custom_node_map().keys() if os.path.exists(os.path.join(custom_nodes_path, x)) or os.path.exists(os.path.join(custom_nodes_path, x) + '.disabled')] nodes = [x for x in nodes if x.lower() not in ['comfy', 'comfyui', 'all']] total = len(nodes) lock = threading.Lock() processed = [] i = 0 def process_custom_node(x): nonlocal i nonlocal processed with lock: i += 1 try: node_path = update_node(x, is_all=is_all, cnt_msg=f'{i}/{total}') with lock: processed.append(node_path) except Exception as e: print(f"ERROR: {e}") traceback.print_exc() with concurrent.futures.ThreadPoolExecutor(4) as executor: for item in nodes: executor.submit(process_custom_node, item) i = 1 for node_path in processed: if node_path is None: print(f"[{i}/{total}] Post update: ERROR") else: print(f"[{i}/{total}] Post update: {node_path}") cm_ctx.post_install(node_path) i += 1 def update_comfyui(): res = core.update_path(comfy_path, instant_execution=True) if res == 'fail': print("Updating ComfyUI has failed.") elif res == 'updated': print("ComfyUI is updated.") else: print("ComfyUI is already up to date.") def enable_node(node_name, is_all=False, cnt_msg=''): if node_name == 'ComfyUI-Manager': return node_path, node_item = cm_ctx.lookup_node_path(node_name, robust=True) if os.path.exists(node_path + '.disabled'): current_name = node_path + '.disabled' os.rename(current_name, node_path) print(f"{cnt_msg} [ENABLED] {node_name:50}") elif os.path.exists(node_path): print(f"{cnt_msg} [SKIPPED] {node_name:50} => Already enabled") elif not is_all: print(f"{cnt_msg} [SKIPPED] {node_name:50} => Not installed") def disable_node(node_name, is_all=False, cnt_msg=''): if node_name == 'ComfyUI-Manager': return node_path, node_item = cm_ctx.lookup_node_path(node_name, robust=True) if os.path.exists(node_path): current_name = node_path new_name = node_path + '.disabled' os.rename(current_name, new_name) print(f"{cnt_msg} [DISABLED] {node_name:50}") elif os.path.exists(node_path + '.disabled'): print(f"{cnt_msg} [ SKIPPED] {node_name:50} => Already disabled") elif not is_all: print(f"{cnt_msg} [ SKIPPED] {node_name:50} => Not installed") def show_list(kind, simple=False): for k, v in cm_ctx.get_custom_node_map().items(): if v[1]: continue node_path = os.path.join(custom_nodes_path, k) states = set() if os.path.exists(node_path): prefix = '[ ENABLED ] ' states.add('installed') states.add('enabled') states.add('all') elif os.path.exists(node_path + '.disabled'): prefix = '[ DISABLED ] ' states.add('installed') states.add('disabled') states.add('all') else: prefix = '[ NOT INSTALLED ] ' states.add('not-installed') states.add('all') if kind in states: if simple: print(f"{k:50}") else: short_id = v[0].get('id', "") print(f"{prefix} {k:50} {short_id:20} (author: {v[0]['author']})") # unregistered nodes candidates = os.listdir(os.path.realpath(custom_nodes_path)) for k in candidates: fullpath = os.path.join(custom_nodes_path, k) if os.path.isfile(fullpath): continue if k in ['__pycache__']: continue states = set() if k.endswith('.disabled'): prefix = '[ DISABLED ] ' states.add('installed') states.add('disabled') states.add('all') k = k[:-9] else: prefix = '[ ENABLED ] ' states.add('installed') states.add('enabled') states.add('all') if k not in cm_ctx.get_custom_node_map(): if kind in states: if simple: print(f"{k:50}") else: print(f"{prefix} {k:50} {'':20} (author: N/A)") def show_snapshot(simple_mode=False): json_obj = core.get_current_snapshot() if simple_mode: print(f"[{json_obj['comfyui']}] comfyui") for k, v in json_obj['git_custom_nodes'].items(): print(f"[{v['hash']}] {k}") for v in json_obj['file_custom_nodes']: print(f"[ N/A ] {v['filename']}") else: formatted_json = json.dumps(json_obj, ensure_ascii=False, indent=4) print(formatted_json) def show_snapshot_list(simple_mode=False): snapshot_path = os.path.join(comfyui_manager_path, 'snapshots') files = os.listdir(snapshot_path) json_files = [x for x in files if x.endswith('.json')] for x in sorted(json_files): print(x) def cancel(): if os.path.exists(script_path): os.remove(script_path) if os.path.exists(restore_snapshot_path): os.remove(restore_snapshot_path) def auto_save_snapshot(): path = core.save_snapshot_with_postfix('cli-autosave') print(f"Current snapshot is saved as `{path}`") def for_each_nodes(nodes, act, allow_all=True): is_all = False if allow_all and 'all' in nodes: is_all = True nodes = [x for x in cm_ctx.get_custom_node_map().keys() if os.path.exists(os.path.join(custom_nodes_path, x)) or os.path.exists(os.path.join(custom_nodes_path, x) + '.disabled')] nodes = [x for x in nodes if x.lower() not in ['comfy', 'comfyui', 'all']] total = len(nodes) i = 1 for x in nodes: try: act(x, is_all=is_all, cnt_msg=f'{i}/{total}') except Exception as e: print(f"ERROR: {e}") traceback.print_exc() i += 1 app = typer.Typer() @app.command(help="Display help for commands") def help(ctx: typer.Context): print(ctx.find_root().get_help()) ctx.exit(0) @app.command(help="Install custom nodes") def install( nodes: List[str] = typer.Argument( ..., help="List of custom nodes to install" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) for_each_nodes(nodes, act=install_node) @app.command(help="Reinstall custom nodes") def reinstall( nodes: List[str] = typer.Argument( ..., help="List of custom nodes to reinstall" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) for_each_nodes(nodes, act=reinstall_node) @app.command(help="Uninstall custom nodes") def uninstall( nodes: List[str] = typer.Argument( ..., help="List of custom nodes to uninstall" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) for_each_nodes(nodes, act=uninstall_node) @app.command(help="Disable custom nodes") def update( nodes: List[str] = typer.Argument( ..., help="[all|List of custom nodes to update]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) if 'all' in nodes: auto_save_snapshot() for x in nodes: if x.lower() in ['comfyui', 'comfy', 'all']: update_comfyui() break update_parallel(nodes) @app.command(help="Disable custom nodes") def disable( nodes: List[str] = typer.Argument( ..., help="[all|List of custom nodes to disable]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) if 'all' in nodes: auto_save_snapshot() for_each_nodes(nodes, disable_node, allow_all=True) @app.command(help="Enable custom nodes") def enable( nodes: List[str] = typer.Argument( ..., help="[all|List of custom nodes to enable]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) if 'all' in nodes: auto_save_snapshot() for_each_nodes(nodes, enable_node, allow_all=True) @app.command(help="Fix dependencies of custom nodes") def fix( nodes: List[str] = typer.Argument( ..., help="[all|List of custom nodes to fix]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) if 'all' in nodes: auto_save_snapshot() for_each_nodes(nodes, fix_node, allow_all=True) @app.command("show", help="Show node list (simple mode)") def show( arg: str = typer.Argument( help="[installed|enabled|not-installed|disabled|all|snapshot|snapshot-list]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): valid_commands = [ "installed", "enabled", "not-installed", "disabled", "all", "snapshot", "snapshot-list", ] if arg not in valid_commands: typer.echo(f"Invalid command: `show {arg}`", err=True) exit(1) cm_ctx.set_channel_mode(channel, mode) if arg == 'snapshot': show_snapshot() elif arg == 'snapshot-list': show_snapshot_list() else: show_list(arg) @app.command("simple-show", help="Show node list (simple mode)") def simple_show( arg: str = typer.Argument( help="[installed|enabled|not-installed|disabled|all|snapshot|snapshot-list]" ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): valid_commands = [ "installed", "enabled", "not-installed", "disabled", "all", "snapshot", "snapshot-list", ] if arg not in valid_commands: typer.echo(f"[bold red]Invalid command: `show {arg}`[/bold red]", err=True) exit(1) cm_ctx.set_channel_mode(channel, mode) if arg == 'snapshot': show_snapshot(True) elif arg == 'snapshot-list': show_snapshot_list(True) else: show_list(arg, True) @app.command('cli-only-mode', help="Set whether to use ComfyUI-Manager in CLI-only mode.") def cli_only_mode( mode: str = typer.Argument( ..., help="[enable|disable]" )): cli_mode_flag = os.path.join(os.path.dirname(__file__), '.enable-cli-only-mode') if mode.lower() == 'enable': with open(cli_mode_flag, 'w') as file: pass print(f"\nINFO: `cli-only-mode` is enabled\n") elif mode.lower() == 'disable': if os.path.exists(cli_mode_flag): os.remove(cli_mode_flag) print(f"\nINFO: `cli-only-mode` is disabled\n") else: print(f"\n[bold red]Invalid value for cli-only-mode: {mode}[/bold red]\n") exit(1) @app.command( "deps-in-workflow", help="Generate dependencies file from workflow (.json/.png)" ) def deps_in_workflow( workflow: Annotated[ str, typer.Option(show_default=False, help="Workflow file (.json/.png)") ], output: Annotated[ str, typer.Option(show_default=False, help="Output file (.json)") ], channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) input_path = workflow output_path = output if not os.path.exists(input_path): print(f"[bold red]File not found: {input_path}[/bold red]") exit(1) used_exts, unknown_nodes = asyncio.run(core.extract_nodes_from_workflow(input_path, mode=cm_ctx.mode, channel_url=cm_ctx.channel)) custom_nodes = {} for x in used_exts: custom_nodes[x] = {'state': core.simple_check_custom_node(x), 'hash': '-' } res = { 'custom_nodes': custom_nodes, 'unknown_nodes': list(unknown_nodes) } with open(output_path, "w", encoding='utf-8') as output_file: json.dump(res, output_file, indent=4) print(f"Workflow dependencies are being saved into {output_path}.") @app.command("save-snapshot", help="Save a snapshot of the current ComfyUI environment. If output path isn't provided. Save to ComfyUI-Manager/snapshots path.") def save_snapshot( output: Annotated[ str, typer.Option( show_default=False, help="Specify the output file path. (.json/.yaml)" ), ] = None, ): path = core.save_snapshot_with_postfix('snapshot', output) print(f"Current snapshot is saved as `{path}`") @app.command("restore-snapshot", help="Restore snapshot from snapshot file") def restore_snapshot( snapshot_name: str, pip_non_url: Optional[bool] = typer.Option( default=None, show_default=False, is_flag=True, help="Restore for pip packages registered on PyPI.", ), pip_non_local_url: Optional[bool] = typer.Option( default=None, show_default=False, is_flag=True, help="Restore for pip packages registered at web URLs.", ), pip_local_url: Optional[bool] = typer.Option( default=None, show_default=False, is_flag=True, help="Restore for pip packages specified by local paths.", ), ): extras = [] if pip_non_url: extras.append('--pip-non-url') if pip_non_local_url: extras.append('--pip-non-local-url') if pip_local_url: extras.append('--pip-local-url') print(f"PIPs restore mode: {extras}") if os.path.exists(snapshot_name): snapshot_path = os.path.abspath(snapshot_name) else: snapshot_path = os.path.join(core.comfyui_manager_path, 'snapshots', snapshot_name) if not os.path.exists(snapshot_path): print(f"[bold red]ERROR: `{snapshot_path}` is not exists.[/bold red]") exit(1) try: cloned_repos = [] checkout_repos = [] skipped_repos = [] enabled_repos = [] disabled_repos = [] is_failed = False def extract_infos(msg): nonlocal is_failed for x in msg: if x.startswith("CLONE: "): cloned_repos.append(x[7:]) elif x.startswith("CHECKOUT: "): checkout_repos.append(x[10:]) elif x.startswith("SKIPPED: "): skipped_repos.append(x[9:]) elif x.startswith("ENABLE: "): enabled_repos.append(x[8:]) elif x.startswith("DISABLE: "): disabled_repos.append(x[9:]) elif 'APPLY SNAPSHOT: False' in x: is_failed = True print(f"Restore snapshot.") cmd_str = [sys.executable, git_script_path, '--apply-snapshot', snapshot_path] + extras output = subprocess.check_output(cmd_str, cwd=custom_nodes_path, text=True) msg_lines = output.split('\n') extract_infos(msg_lines) for url in cloned_repos: cm_ctx.post_install(url) # print summary for x in cloned_repos: print(f"[ INSTALLED ] {x}") for x in checkout_repos: print(f"[ CHECKOUT ] {x}") for x in enabled_repos: print(f"[ ENABLED ] {x}") for x in disabled_repos: print(f"[ DISABLED ] {x}") if is_failed: print(output) print("[bold red]ERROR: Failed to restore snapshot.[/bold red]") except Exception: print("[bold red]ERROR: Failed to restore snapshot.[/bold red]") traceback.print_exc() raise typer.Exit(code=1) @app.command( "restore-dependencies", help="Restore dependencies from whole installed custom nodes." ) def restore_dependencies(): node_paths = [os.path.join(custom_nodes_path, name) for name in os.listdir(custom_nodes_path) if os.path.isdir(os.path.join(custom_nodes_path, name)) and not name.endswith('.disabled')] total = len(node_paths) i = 1 for x in node_paths: print(f"----------------------------------------------------------------------------------------------------") print(f"Restoring [{i}/{total}]: {x}") cm_ctx.post_install(x) i += 1 @app.command( "post-install", help="Install dependencies and execute installation script" ) def post_install( path: str = typer.Argument( help="path to custom node", )): path = os.path.expanduser(path) cm_ctx.post_install(path) @app.command( "install-deps", help="Install dependencies from dependencies file(.json) or workflow(.png/.json)", ) def install_deps( deps: str = typer.Argument( help="Dependency spec file (.json)", ), channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" ), ): cm_ctx.set_channel_mode(channel, mode) auto_save_snapshot() if not os.path.exists(deps): print(f"[bold red]File not found: {deps}[/bold red]") exit(1) else: with open(deps, 'r', encoding="UTF-8", errors="ignore") as json_file: try: json_obj = json.load(json_file) except: print(f"[bold red]Invalid json file: {deps}[/bold red]") exit(1) for k in json_obj['custom_nodes'].keys(): state = core.simple_check_custom_node(k) if state == 'installed': continue elif state == 'not-installed': core.gitclone_install([k], instant_execution=True) else: # disabled core.gitclone_set_active([k], False) print("Dependency installation and activation complete.") @app.command(help="Clear reserved startup action in ComfyUI-Manager") def clear(): cancel() @app.command("export-custom-node-ids", help="Export custom node ids") def export_custom_node_ids( path: str, channel: Annotated[ str, typer.Option( show_default=False, help="Specify the operation mode" ), ] = None, mode: str = typer.Option( None, help="[remote|local|cache]" )): cm_ctx.set_channel_mode(channel, mode) with open(path, "w", encoding='utf-8') as output_file: for x in cm_ctx.get_custom_node_map().keys(): print(x, file=output_file) if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) sys.exit(app()) print(f"")