import csv import os import secrets import sys from pathlib import Path from typing import Any from aiohttp import web from .log import mklog from .utils import ( backup_file, import_install, input_dir, output_dir, reqs_map, run_command, styles_dir, ) endlog = mklog("mtb endpoint") # - ACTIONS import_install("requirements") def ACTIONS_installDependency(dependency_names=None): if dependency_names is None: # return web.Response(text="No dependency name provided", status=400) return {"error": "No dependency name provided"} endlog.debug(f"Received Install Dependency request for {dependency_names}") # reqs = [] resolved_names = [reqs_map.get(name, name) for name in dependency_names] try: run_command( [Path(sys.executable), "-m", "pip", "install"] + resolved_names ) return {"success": True} except Exception as e: return {"error": f"Failed to install dependencies: {e}"} # if platform.system() == "Windows": # reqs = list(requirements.parse((here / "reqs_windows.txt").read_text())) # else: # reqs = list(requirements.parse((here / "reqs.txt").read_text())) # print([x.specs for x in reqs]) # print( # "\n".join([f"{x.line} {''.join(x.specs[0] if x.specs else '')}" for x in reqs]) # ) # for dependency_name in dependency_names: # for req in reqs: # if req.name == dependency_name: # endlog.debug(f"Dependency {dependency_name} installed") # break def ACTIONS_getUserImages( mode: str, count=200, offset=0, sort: str | None = None, include_subfolders: bool = False, ): # TODO: find a better name :s enabled = "MTB_EXPOSE" in os.environ if not enabled: return {"error": "Session not authorized to getInputs"} imgs = {} entry_dir = input_dir if mode == "input" else output_dir pattern = "**/*.png" if include_subfolders else "*.png" entry_gen = entry_dir.glob(pattern) entries = {} if sort: sort = sort.lower() if sort == "none": entries = entry_gen elif sort == "modified": entries = sorted( entry_gen, key=lambda x: x.stat().st_mtime, reverse=True ) elif sort == "modified-reverse": entries = sorted(entry_gen, key=lambda x: x.stat().st_mtime) elif sort == "name": entries = sorted(entry_gen, key=lambda x: x.name) elif sort == "name-reverse": entries = sorted(entry_gen, key=lambda x: x.name, reverse=True) else: endlog.warning(f"Sort mode {sort} not supported") entries = entry_gen else: entries = entry_gen for i, img in enumerate(entries): if i < offset: continue subfolder = ( img.parent.relative_to(entry_dir) if include_subfolders else "" ) imgs[img.stem] = ( f"/mtb/view?filename={img.name}&width=512&type={mode}&subfolder=" f"{subfolder}" f"&preview=&rand={secrets.randbelow(424242)}" ) if i >= count + offset - 1: break return imgs def ACTIONS_getStyles(style_name=None): from .nodes.conditions import MTB_StylesLoader styles = MTB_StylesLoader.options match_list = ["name"] if styles: filtered_styles = { key: value for key, value in styles.items() if not key.startswith("__") and key not in match_list } if style_name: return filtered_styles.get( style_name, {"error": "Style not found"} ) return filtered_styles return {"error": "No styles found"} def ACTIONS_saveStyle(data): # endlog.debug(f"Received Save Styles for {data.keys()}") # endlog.debug(data) styles = [f.name for f in styles_dir.iterdir() if f.suffix == ".csv"] target = None rows = [] for fp, content in data.items(): if fp in styles: endlog.debug(f"Overwriting {fp}") target = styles_dir / fp rows = content break if not target: endlog.warning( f"Could not determine the target file for {data.keys()}" ) return {"error": "Could not determine the target file for the style"} backup_file(target) with target.open("w", newline="", encoding="utf-8") as file: csv_writer = csv.writer(file, quoting=csv.QUOTE_ALL) for row in rows: csv_writer.writerow(row) async def do_action(request: web.Request) -> web.Response: endlog.debug("Init action request") request_data = await request.json() name = request_data.get("name") args = request_data.get("args") endlog.debug(f"Received action request: {name} {args}") method_name = f"ACTIONS_{name}" method = globals().get(method_name) if callable(method): result = None if args: result = method(*args) if isinstance(args, list) else method(args) else: result = method() endlog.debug(f"Action result: {result}") return web.json_response({"result": result}) available_methods = [ attr[len("ACTIONS_") :] for attr in globals() if attr.startswith("ACTIONS_") ] return web.json_response( { "error": "Invalid method name.", "available_methods": available_methods, } ) # - HTML UTILS def dependencies_button(name: str, dependencies: list[str]) -> str: deps = ",".join([f"'{x}'" for x in dependencies]) return f""" """ def csv_editor(): inputs = [f for f in styles_dir.iterdir() if f.suffix == ".csv"] # rows = {f.stem: list(csv.reader(f.read_text("utf8"))) for f in styles} style_files = {} for file in inputs: with open(file, encoding="utf8") as f: parsed = csv.reader(f) style_files[file.name] = [] for row in parsed: endlog.debug(f"Adding style {row[0]}") style_files[file.name].append((row[0], row[1], row[2])) html_out = """

Style Editor

""" for current, styles in style_files.items(): current_out = f"

{current}

" table_rows = [] for index, style in enumerate(styles): table_rows += ( ([""] + [f"{cell}" for cell in style] + [""]) if index == 0 else ( [""] + [ f"" if i == 0 else f"" for i, cell in enumerate(style) ] + [""] ) ) current_out += ( f"" + "".join(table_rows) + "
" ) current_out += f"" html_out += add_foldable_region(current, current_out) html_out += "
" html_out += """""" return html_out def render_tab_view(**kwargs): tab_headers = [] tab_contents = [] for idx, (tab_name, content) in enumerate(kwargs.items()): active_class = "active" if idx == 0 else "" tab_headers.append( f"" ) tab_contents.append( f"
{content}
" ) headers_str = "\n".join(tab_headers) contents_str = "\n".join(tab_contents) return f"""
{headers_str}
{contents_str}
""" def add_foldable_region(title: str, content: str): symbol_id = f"{title}-symbol" return f"""
{title}
{content}
""" def add_split_pane( left_content: str, right_content: str, *, vertical: bool = True ): orientation = "vertical" if vertical else "horizontal" return f"""
{left_content}
{right_content}
""" def add_dropdown(title: str, options: list[str]): option_str = "\n".join( [f"" for opt in options] ) return f""" """ def render_table(table_dict: dict[str, Any], sort=True, title=None): table_list = sorted( table_dict.items(), key=lambda item: item[0] ) # Sort the dictionary by keys table_rows = "" for name, item in table_list: if isinstance(item, dict): if "dependencies" in item: table_rows += f"{name}" table_rows += ( f"{dependencies_button(name,item['dependencies'])}" ) table_rows += "" else: table_rows += ( f"{name}{render_table(item)}" ) # elif isinstance(item, str): # table_rows += f"{name}{item}" else: table_rows += f"{name}{item}" return f"""
{"" if title is None else f"

{title}

"} {table_rows}
Name Description
""" def render_base_template(title: str, content: str): github_icon_svg = """""" return f""" {title}
Back to Comfy {github_icon_svg}
{content}
"""