Osterkarten / modules /launch_util.py
lllyasviel's picture
i
3bdbace
raw
history blame
4.16 kB
import os
import importlib
import importlib.util
import subprocess
import sys
import re
from functools import lru_cache
python = sys.executable
git = os.environ.get('GIT', "git")
default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1")
index_url = os.environ.get('INDEX_URL', "")
modules_path = os.path.dirname(os.path.realpath(__file__))
script_path = os.path.dirname(modules_path)
dir_repos = "repositories"
fooocus_tag = '1.0.0'
@lru_cache()
def commit_hash():
try:
return subprocess.check_output([git, "rev-parse", "HEAD"], shell=False, encoding='utf8').strip()
except Exception:
return "<none>"
def git_clone(url, dir, name, commithash=None):
# TODO clone into temporary dir and move if successful
if os.path.exists(dir):
if commithash is None:
return
current_hash = run(f'"{git}" -C "{dir}" rev-parse HEAD', None, f"Couldn't determine {name}'s hash: {commithash}", live=False).strip()
if current_hash == commithash:
return
run(f'"{git}" -C "{dir}" fetch', f"Fetching updates for {name}...", f"Couldn't fetch {name}")
run(f'"{git}" -C "{dir}" checkout {commithash}', f"Checking out commit for {name} with hash: {commithash}...", f"Couldn't checkout commit {commithash} for {name}", live=True)
return
run(f'"{git}" clone "{url}" "{dir}"', f"Cloning {name} into {dir}...", f"Couldn't clone {name}", live=True)
if commithash is not None:
run(f'"{git}" -C "{dir}" checkout {commithash}', None, "Couldn't checkout {name}'s hash: {commithash}")
def repo_dir(name):
return os.path.join(script_path, dir_repos, name)
def is_installed(package):
try:
spec = importlib.util.find_spec(package)
except ModuleNotFoundError:
return False
return spec is not None
def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str:
if desc is not None:
print(desc)
run_kwargs = {
"args": command,
"shell": True,
"env": os.environ if custom_env is None else custom_env,
"encoding": 'utf8',
"errors": 'ignore',
}
if not live:
run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE
result = subprocess.run(**run_kwargs)
if result.returncode != 0:
error_bits = [
f"{errdesc or 'Error running command'}.",
f"Command: {command}",
f"Error code: {result.returncode}",
]
if result.stdout:
error_bits.append(f"stdout: {result.stdout}")
if result.stderr:
error_bits.append(f"stderr: {result.stderr}")
raise RuntimeError("\n".join(error_bits))
return (result.stdout or "")
def run_pip(command, desc=None, live=default_command_live):
index_url_line = f' --index-url {index_url}' if index_url != '' else ''
return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}", errdesc=f"Couldn't install {desc}", live=live)
re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*")
def requirements_met(requirements_file):
"""
Does a simple parse of a requirements.txt file to determine if all rerqirements in it
are already installed. Returns True if so, False if not installed or parsing fails.
"""
import importlib.metadata
import packaging.version
with open(requirements_file, "r", encoding="utf8") as file:
for line in file:
if line.strip() == "":
continue
m = re.match(re_requirement, line)
if m is None:
return False
package = m.group(1).strip()
version_required = (m.group(2) or "").strip()
if version_required == "":
continue
try:
version_installed = importlib.metadata.version(package)
except Exception:
return False
if packaging.version.parse(version_required) != packaging.version.parse(version_installed):
return False
return True