|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" ConfigMixin base class and utilities.""" |
|
import functools |
|
import importlib |
|
import inspect |
|
import json |
|
import os |
|
import re |
|
import tempfile |
|
from collections import OrderedDict |
|
from typing import Any, Dict, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
from huggingface_hub import ( |
|
create_repo, |
|
get_hf_file_metadata, |
|
hf_hub_download, |
|
hf_hub_url, |
|
repo_type_and_id_from_hf_id, |
|
upload_folder, |
|
) |
|
from huggingface_hub.utils import EntryNotFoundError |
|
from requests import HTTPError |
|
|
|
from .download_utils import ppdiffusers_bos_download |
|
from .utils import ( |
|
DOWNLOAD_SERVER, |
|
HF_CACHE, |
|
PPDIFFUSERS_CACHE, |
|
DummyObject, |
|
deprecate, |
|
logging, |
|
) |
|
from .version import VERSION as __version__ |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
_re_configuration_file = re.compile(r"config\.(.*)\.json") |
|
|
|
|
|
class FrozenDict(OrderedDict): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
for key, value in self.items(): |
|
setattr(self, key, value) |
|
|
|
self.__frozen = True |
|
|
|
def __delitem__(self, *args, **kwargs): |
|
raise Exception(f"You cannot use ``__delitem__`` on a {self.__class__.__name__} instance.") |
|
|
|
def setdefault(self, *args, **kwargs): |
|
raise Exception(f"You cannot use ``setdefault`` on a {self.__class__.__name__} instance.") |
|
|
|
def pop(self, *args, **kwargs): |
|
raise Exception(f"You cannot use ``pop`` on a {self.__class__.__name__} instance.") |
|
|
|
def update(self, *args, **kwargs): |
|
raise Exception(f"You cannot use ``update`` on a {self.__class__.__name__} instance.") |
|
|
|
def __setattr__(self, name, value): |
|
if hasattr(self, "__frozen") and self.__frozen: |
|
raise Exception(f"You cannot use ``__setattr__`` on a {self.__class__.__name__} instance.") |
|
super().__setattr__(name, value) |
|
|
|
def __setitem__(self, name, value): |
|
if hasattr(self, "__frozen") and self.__frozen: |
|
raise Exception(f"You cannot use ``__setattr__`` on a {self.__class__.__name__} instance.") |
|
super().__setitem__(name, value) |
|
|
|
|
|
class ConfigMixin: |
|
r""" |
|
Base class for all configuration classes. Stores all configuration parameters under `self.config` Also handles all |
|
methods for loading/downloading/saving classes inheriting from [`ConfigMixin`] with |
|
- [`~ConfigMixin.from_config`] |
|
- [`~ConfigMixin.save_config`] |
|
|
|
Class attributes: |
|
- **config_name** (`str`) -- A filename under which the config should stored when calling |
|
[`~ConfigMixin.save_config`] (should be overridden by parent class). |
|
- **ignore_for_config** (`List[str]`) -- A list of attributes that should not be saved in the config (should be |
|
overridden by subclass). |
|
- **has_compatibles** (`bool`) -- Whether the class has compatible classes (should be overridden by subclass). |
|
- **_deprecated_kwargs** (`List[str]`) -- Keyword arguments that are deprecated. Note that the init function |
|
should only have a `kwargs` argument if at least one argument is deprecated (should be overridden by |
|
subclass). |
|
""" |
|
config_name = None |
|
ignore_for_config = [] |
|
has_compatibles = False |
|
_deprecated_kwargs = [] |
|
|
|
def register_to_config(self, **kwargs): |
|
if self.config_name is None: |
|
raise NotImplementedError(f"Make sure that {self.__class__} has defined a class name `config_name`") |
|
|
|
|
|
|
|
|
|
kwargs.pop("kwargs", None) |
|
for key, value in kwargs.items(): |
|
try: |
|
setattr(self, key, value) |
|
except AttributeError as err: |
|
logger.error(f"Can't set {key} with value {value} for {self}") |
|
raise err |
|
|
|
if not hasattr(self, "_internal_dict"): |
|
internal_dict = kwargs |
|
else: |
|
previous_dict = dict(self._internal_dict) |
|
internal_dict = {**self._internal_dict, **kwargs} |
|
logger.debug(f"Updating config from {previous_dict} to {internal_dict}") |
|
|
|
self._internal_dict = FrozenDict(internal_dict) |
|
|
|
def save_config(self, save_directory: Union[str, os.PathLike], push_to_hub: bool = False, **kwargs): |
|
""" |
|
Save a configuration object to the directory `save_directory`, so that it can be re-loaded using the |
|
[`~ConfigMixin.from_config`] class method. |
|
|
|
Args: |
|
save_directory (`str` or `os.PathLike`): |
|
Directory where the configuration JSON file will be saved (will be created if it does not exist). |
|
""" |
|
if os.path.isfile(save_directory): |
|
raise AssertionError(f"Provided path ({save_directory}) should be a directory, not a file") |
|
|
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
|
|
output_config_file = os.path.join(save_directory, self.config_name) |
|
|
|
self.to_json_file(output_config_file) |
|
logger.info(f"Configuration saved in {output_config_file}") |
|
|
|
def save_to_hf_hub( |
|
self, |
|
repo_id: str, |
|
private: Optional[bool] = None, |
|
subfolder: Optional[str] = None, |
|
commit_message: Optional[str] = None, |
|
revision: Optional[str] = None, |
|
create_pr: bool = False, |
|
): |
|
""" |
|
Uploads all elements of this config to a new HuggingFace Hub repository. |
|
Args: |
|
repo_id (str): Repository name for your model/tokenizer in the Hub. |
|
private (bool, optional): Whether the model/tokenizer is set to private |
|
subfolder (str, optional): Push to a subfolder of the repo instead of the root |
|
commit_message (str, optional): The summary / title / first line of the generated commit. Defaults to: f"Upload {path_in_repo} with huggingface_hub" |
|
revision (str, optional): The git revision to commit from. Defaults to the head of the "main" branch. |
|
create_pr (boolean, optional): Whether or not to create a Pull Request with that commit. Defaults to False. |
|
If revision is not set, PR is opened against the "main" branch. If revision is set and is a branch, PR is opened against this branch. |
|
If revision is set and is not a branch name (example: a commit oid), an RevisionNotFoundError is returned by the server. |
|
|
|
Returns: The url of the commit of your model in the given repository. |
|
""" |
|
repo_url = create_repo(repo_id, private=private, exist_ok=True) |
|
|
|
|
|
|
|
_, repo_owner, repo_name = repo_type_and_id_from_hf_id(repo_url) |
|
|
|
repo_id = f"{repo_owner}/{repo_name}" |
|
|
|
|
|
try: |
|
get_hf_file_metadata(hf_hub_url(repo_id=repo_id, filename="README.md", revision=revision)) |
|
has_readme = True |
|
except EntryNotFoundError: |
|
has_readme = False |
|
|
|
with tempfile.TemporaryDirectory() as root_dir: |
|
if subfolder is not None: |
|
save_dir = os.path.join(root_dir, subfolder) |
|
else: |
|
save_dir = root_dir |
|
|
|
self.save_config(save_dir) |
|
|
|
logger.info("README.md not found, adding the default README.md") |
|
if not has_readme: |
|
with open(os.path.join(root_dir, "README.md"), "w") as f: |
|
f.write(f"---\nlibrary_name: ppdiffusers\n---\n# {repo_id}") |
|
|
|
|
|
logger.info(f"Pushing to the {repo_id}. This might take a while") |
|
return upload_folder( |
|
repo_id=repo_id, |
|
repo_type="model", |
|
folder_path=root_dir, |
|
commit_message=commit_message, |
|
revision=revision, |
|
create_pr=create_pr, |
|
) |
|
|
|
@classmethod |
|
def from_config(cls, config: Union[FrozenDict, Dict[str, Any]] = None, return_unused_kwargs=False, **kwargs): |
|
r""" |
|
Instantiate a Python class from a config dictionary |
|
|
|
Parameters: |
|
config (`Dict[str, Any]`): |
|
A config dictionary from which the Python class will be instantiated. Make sure to only load |
|
configuration files of compatible classes. |
|
return_unused_kwargs (`bool`, *optional*, defaults to `False`): |
|
Whether kwargs that are not consumed by the Python class should be returned or not. |
|
|
|
kwargs (remaining dictionary of keyword arguments, *optional*): |
|
Can be used to update the configuration object (after it being loaded) and initiate the Python class. |
|
`**kwargs` will be directly passed to the underlying scheduler/model's `__init__` method and eventually |
|
overwrite same named arguments of `config`. |
|
|
|
Examples: |
|
|
|
```python |
|
>>> from ppdiffusers import DDPMScheduler, DDIMScheduler, PNDMScheduler |
|
|
|
>>> # Download scheduler from BOS and cache. |
|
>>> scheduler = DDPMScheduler.from_pretrained("google/ddpm-cifar10-32") |
|
|
|
>>> # Instantiate DDIM scheduler class with same config as DDPM |
|
>>> scheduler = DDIMScheduler.from_config(scheduler.config) |
|
|
|
>>> # Instantiate PNDM scheduler class with same config as DDPM |
|
>>> scheduler = PNDMScheduler.from_config(scheduler.config) |
|
``` |
|
""" |
|
|
|
|
|
if "pretrained_model_name_or_path" in kwargs: |
|
config = kwargs.pop("pretrained_model_name_or_path") |
|
|
|
if config is None: |
|
raise ValueError("Please make sure to provide a config as the first positional argument.") |
|
|
|
|
|
if not isinstance(config, dict): |
|
deprecation_message = "It is deprecated to pass a pretrained model name or path to `from_config`." |
|
if "Scheduler" in cls.__name__: |
|
deprecation_message += ( |
|
f"If you were trying to load a scheduler, please use {cls}.from_pretrained(...) instead." |
|
" Otherwise, please make sure to pass a configuration dictionary instead. This functionality will" |
|
" be removed in v1.0.0." |
|
) |
|
elif "Model" in cls.__name__: |
|
deprecation_message += ( |
|
f"If you were trying to load a model, please use {cls}.load_config(...) followed by" |
|
f" {cls}.from_config(...) instead. Otherwise, please make sure to pass a configuration dictionary" |
|
" instead. This functionality will be removed in v1.0.0." |
|
) |
|
deprecate("config-passed-as-path", "1.0.0", deprecation_message, standard_warn=False) |
|
config, kwargs = cls.load_config(pretrained_model_name_or_path=config, return_unused_kwargs=True, **kwargs) |
|
|
|
init_dict, unused_kwargs, hidden_dict = cls.extract_init_dict(config, **kwargs) |
|
|
|
|
|
if "dtype" in unused_kwargs: |
|
|
|
unused_kwargs.pop("dtype") |
|
|
|
|
|
|
|
for deprecated_kwarg in cls._deprecated_kwargs: |
|
if deprecated_kwarg in unused_kwargs: |
|
init_dict[deprecated_kwarg] = unused_kwargs.pop(deprecated_kwarg) |
|
|
|
|
|
model = cls(**init_dict) |
|
|
|
|
|
model.register_to_config(**hidden_dict) |
|
|
|
|
|
unused_kwargs = {**unused_kwargs, **hidden_dict} |
|
|
|
if return_unused_kwargs: |
|
return (model, unused_kwargs) |
|
else: |
|
return model |
|
|
|
@classmethod |
|
def get_config_dict(cls, *args, **kwargs): |
|
deprecation_message = ( |
|
f" The function get_config_dict is deprecated. Please use {cls}.load_config instead. This function will be" |
|
" removed in version v1.0.0" |
|
) |
|
deprecate("get_config_dict", "1.0.0", deprecation_message, standard_warn=False) |
|
return cls.load_config(*args, **kwargs) |
|
|
|
@classmethod |
|
def load_config( |
|
cls, pretrained_model_name_or_path: Union[str, os.PathLike], return_unused_kwargs=False, **kwargs |
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]: |
|
r""" |
|
Instantiate a Python class from a config dictionary |
|
|
|
Parameters: |
|
pretrained_model_name_or_path (`str` or `os.PathLike`, *optional*): |
|
Can be either: |
|
|
|
- A string, the *model id* of a model repo on huggingface.co. Valid model ids should have an |
|
organization name, like `google/ddpm-celebahq-256`. |
|
- A path to a *directory* containing model weights saved using [`~ConfigMixin.save_config`], e.g., |
|
`./my_model_directory/`. |
|
|
|
cache_dir (`Union[str, os.PathLike]`, *optional*): |
|
Path to a directory in which a downloaded pretrained model configuration should be cached if the |
|
standard cache should not be used. |
|
output_loading_info(`bool`, *optional*, defaults to `False`): |
|
Whether or not to also return a dictionary containing missing keys, unexpected keys and error messages. |
|
subfolder (`str`, *optional*, defaults to `""`): |
|
In case the relevant files are located inside a subfolder of the model repo (either remote in |
|
huggingface.co or downloaded locally), you can specify the folder name here. |
|
from_hf_hub (bool, *optional*): |
|
Whether to load from Hugging Face Hub. Defaults to False |
|
""" |
|
from_hf_hub = kwargs.pop("from_hf_hub", False) |
|
if from_hf_hub: |
|
cache_dir = kwargs.pop("cache_dir", HF_CACHE) |
|
else: |
|
cache_dir = kwargs.pop("cache_dir", PPDIFFUSERS_CACHE) |
|
subfolder = kwargs.pop("subfolder", None) |
|
|
|
pretrained_model_name_or_path = str(pretrained_model_name_or_path) |
|
|
|
if cls.config_name is None: |
|
raise ValueError( |
|
"`self.config_name` is not defined. Note that one should not load a config from " |
|
"`ConfigMixin`. Please make sure to define `config_name` in a class inheriting from `ConfigMixin`" |
|
) |
|
|
|
if os.path.isfile(pretrained_model_name_or_path): |
|
config_file = pretrained_model_name_or_path |
|
elif os.path.isdir(pretrained_model_name_or_path): |
|
if os.path.isfile(os.path.join(pretrained_model_name_or_path, cls.config_name)): |
|
|
|
config_file = os.path.join(pretrained_model_name_or_path, cls.config_name) |
|
elif subfolder is not None and os.path.isfile( |
|
os.path.join(pretrained_model_name_or_path, subfolder, cls.config_name) |
|
): |
|
config_file = os.path.join(pretrained_model_name_or_path, subfolder, cls.config_name) |
|
else: |
|
raise EnvironmentError( |
|
f"Error no file named {cls.config_name} found in directory {pretrained_model_name_or_path}." |
|
) |
|
elif from_hf_hub: |
|
config_file = hf_hub_download( |
|
repo_id=pretrained_model_name_or_path, |
|
filename=cls.config_name, |
|
cache_dir=cache_dir, |
|
subfolder=subfolder, |
|
library_name="PPDiffusers", |
|
library_version=__version__, |
|
) |
|
else: |
|
try: |
|
config_file = ppdiffusers_bos_download( |
|
pretrained_model_name_or_path, |
|
filename=cls.config_name, |
|
subfolder=subfolder, |
|
cache_dir=cache_dir, |
|
) |
|
except HTTPError as err: |
|
raise EnvironmentError( |
|
"There was a specific connection error when trying to load" |
|
f" {pretrained_model_name_or_path}:\n{err}" |
|
) |
|
except ValueError: |
|
raise EnvironmentError( |
|
f"We couldn't connect to '{DOWNLOAD_SERVER}' to load this model, couldn't find it" |
|
f" in the cached files and it looks like {pretrained_model_name_or_path} is not the path to a" |
|
f" directory containing a {cls.config_name} file.\nCheckout your internet connection or see how to" |
|
" run the library in offline mode at" |
|
" 'https://huggingface.co/docs/diffusers/installation#offline-mode'." |
|
) |
|
except EnvironmentError: |
|
raise EnvironmentError( |
|
f"Can't load config for '{pretrained_model_name_or_path}'. If you were trying to load it from " |
|
"'https://huggingface.co/models', make sure you don't have a local directory with the same name. " |
|
f"Otherwise, make sure '{pretrained_model_name_or_path}' is the correct path to a directory " |
|
f"containing a {cls.config_name} file" |
|
) |
|
|
|
try: |
|
|
|
config_dict = cls._dict_from_json_file(config_file) |
|
except (json.JSONDecodeError, UnicodeDecodeError): |
|
raise EnvironmentError(f"It looks like the config file at '{config_file}' is not a valid JSON file.") |
|
|
|
if return_unused_kwargs: |
|
return config_dict, kwargs |
|
|
|
return config_dict |
|
|
|
@staticmethod |
|
def _get_init_keys(cls): |
|
return set(dict(inspect.signature(cls.__init__).parameters).keys()) |
|
|
|
@classmethod |
|
def extract_init_dict(cls, config_dict, **kwargs): |
|
|
|
original_dict = {k: v for k, v in config_dict.items()} |
|
|
|
|
|
expected_keys = cls._get_init_keys(cls) |
|
expected_keys.remove("self") |
|
|
|
if "kwargs" in expected_keys: |
|
expected_keys.remove("kwargs") |
|
|
|
|
|
|
|
if len(cls.ignore_for_config) > 0: |
|
expected_keys = expected_keys - set(cls.ignore_for_config) |
|
|
|
|
|
ppdiffusers_library = importlib.import_module(__name__.split(".")[0]) |
|
|
|
if cls.has_compatibles: |
|
compatible_classes = [c for c in cls._get_compatibles() if not isinstance(c, DummyObject)] |
|
else: |
|
compatible_classes = [] |
|
|
|
expected_keys_comp_cls = set() |
|
for c in compatible_classes: |
|
expected_keys_c = cls._get_init_keys(c) |
|
expected_keys_comp_cls = expected_keys_comp_cls.union(expected_keys_c) |
|
expected_keys_comp_cls = expected_keys_comp_cls - cls._get_init_keys(cls) |
|
config_dict = {k: v for k, v in config_dict.items() if k not in expected_keys_comp_cls} |
|
|
|
|
|
orig_cls_name = config_dict.pop("_class_name", cls.__name__) |
|
if orig_cls_name != cls.__name__ and hasattr(ppdiffusers_library, orig_cls_name): |
|
orig_cls = getattr(ppdiffusers_library, orig_cls_name) |
|
unexpected_keys_from_orig = cls._get_init_keys(orig_cls) - expected_keys |
|
config_dict = {k: v for k, v in config_dict.items() if k not in unexpected_keys_from_orig} |
|
|
|
|
|
config_dict = {k: v for k, v in config_dict.items() if not k.startswith("_")} |
|
|
|
|
|
init_dict = {} |
|
for key in expected_keys: |
|
|
|
|
|
if key in kwargs and key in config_dict: |
|
config_dict[key] = kwargs.pop(key) |
|
|
|
if key in kwargs: |
|
|
|
init_dict[key] = kwargs.pop(key) |
|
elif key in config_dict: |
|
|
|
init_dict[key] = config_dict.pop(key) |
|
|
|
|
|
if len(config_dict) > 0: |
|
logger.warning( |
|
f"The config attributes {config_dict} were passed to {cls.__name__}, " |
|
"but are not expected and will be ignored. Please verify your " |
|
f"{cls.config_name} configuration file." |
|
) |
|
|
|
|
|
passed_keys = set(init_dict.keys()) |
|
if len(expected_keys - passed_keys) > 0: |
|
logger.info( |
|
f"{expected_keys - passed_keys} was not found in config. Values will be initialized to default values." |
|
) |
|
|
|
|
|
unused_kwargs = {**config_dict, **kwargs} |
|
|
|
|
|
hidden_config_dict = {k: v for k, v in original_dict.items() if k not in init_dict} |
|
|
|
return init_dict, unused_kwargs, hidden_config_dict |
|
|
|
@classmethod |
|
def _dict_from_json_file(cls, json_file: Union[str, os.PathLike]): |
|
with open(json_file, "r", encoding="utf-8") as reader: |
|
text = reader.read() |
|
return json.loads(text) |
|
|
|
def __repr__(self): |
|
return f"{self.__class__.__name__} {self.to_json_string()}" |
|
|
|
@property |
|
def config(self) -> Dict[str, Any]: |
|
""" |
|
Returns the config of the class as a frozen dictionary |
|
|
|
Returns: |
|
`Dict[str, Any]`: Config of the class. |
|
""" |
|
return self._internal_dict |
|
|
|
def to_json_string(self) -> str: |
|
""" |
|
Serializes this instance to a JSON string. |
|
|
|
Returns: |
|
`str`: String containing all the attributes that make up this configuration instance in JSON format. |
|
""" |
|
config_dict = self._internal_dict if hasattr(self, "_internal_dict") else {} |
|
config_dict["_class_name"] = self.__class__.__name__ |
|
config_dict["_ppdiffusers_version"] = __version__ |
|
|
|
def to_json_saveable(value): |
|
if isinstance(value, np.ndarray): |
|
value = value.tolist() |
|
return value |
|
|
|
config_dict = {k: to_json_saveable(v) for k, v in config_dict.items()} |
|
return json.dumps(config_dict, indent=2, sort_keys=True) + "\n" |
|
|
|
def to_json_file(self, json_file_path: Union[str, os.PathLike]): |
|
""" |
|
Save this instance to a JSON file. |
|
|
|
Args: |
|
json_file_path (`str` or `os.PathLike`): |
|
Path to the JSON file in which this configuration instance's parameters will be saved. |
|
""" |
|
with open(json_file_path, "w", encoding="utf-8") as writer: |
|
writer.write(self.to_json_string()) |
|
|
|
|
|
def register_to_config(init): |
|
r""" |
|
Decorator to apply on the init of classes inheriting from [`ConfigMixin`] so that all the arguments are |
|
automatically sent to `self.register_for_config`. To ignore a specific argument accepted by the init but that |
|
shouldn't be registered in the config, use the `ignore_for_config` class variable |
|
|
|
Warning: Once decorated, all private arguments (beginning with an underscore) are trashed and not sent to the init! |
|
""" |
|
|
|
@functools.wraps(init) |
|
def inner_init(self, *args, **kwargs): |
|
|
|
init_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")} |
|
config_init_kwargs = {k: v for k, v in kwargs.items() if k.startswith("_")} |
|
|
|
if not isinstance(self, ConfigMixin): |
|
raise RuntimeError( |
|
f"`@register_for_config` was applied to {self.__class__.__name__} init method, but this class does " |
|
"not inherit from `ConfigMixin`." |
|
) |
|
|
|
ignore = getattr(self, "ignore_for_config", []) |
|
|
|
new_kwargs = {} |
|
signature = inspect.signature(init) |
|
parameters = { |
|
name: p.default for i, (name, p) in enumerate(signature.parameters.items()) if i > 0 and name not in ignore |
|
} |
|
for arg, name in zip(args, parameters.keys()): |
|
new_kwargs[name] = arg |
|
|
|
|
|
new_kwargs.update( |
|
{ |
|
k: init_kwargs.get(k, default) |
|
for k, default in parameters.items() |
|
if k not in ignore and k not in new_kwargs |
|
} |
|
) |
|
new_kwargs = {**config_init_kwargs, **new_kwargs} |
|
getattr(self, "register_to_config")(**new_kwargs) |
|
init(self, *args, **init_kwargs) |
|
|
|
return inner_init |
|
|