File size: 2,616 Bytes
43b7e92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
import json
import logging
import os
from collections import defaultdict
from pathlib import Path
from huggingface_hub import HfApi, ModelFilter
import diffusers
PATH_TO_REPO = Path(__file__).parent.parent.resolve()
ALWAYS_TEST_PIPELINE_MODULES = [
"controlnet",
"stable_diffusion",
"stable_diffusion_2",
"stable_diffusion_xl",
"stable_diffusion_adapter",
"deepfloyd_if",
"ip_adapters",
"kandinsky",
"kandinsky2_2",
"text_to_video_synthesis",
"wuerstchen",
]
PIPELINE_USAGE_CUTOFF = int(os.getenv("PIPELINE_USAGE_CUTOFF", 50000))
logger = logging.getLogger(__name__)
api = HfApi()
filter = ModelFilter(library="diffusers")
def filter_pipelines(usage_dict, usage_cutoff=10000):
output = []
for diffusers_object, usage in usage_dict.items():
if usage < usage_cutoff:
continue
is_diffusers_pipeline = hasattr(diffusers.pipelines, diffusers_object)
if not is_diffusers_pipeline:
continue
output.append(diffusers_object)
return output
def fetch_pipeline_objects():
models = api.list_models(filter=filter)
downloads = defaultdict(int)
for model in models:
is_counted = False
for tag in model.tags:
if tag.startswith("diffusers:"):
is_counted = True
downloads[tag[len("diffusers:") :]] += model.downloads
if not is_counted:
downloads["other"] += model.downloads
# Remove 0 downloads
downloads = {k: v for k, v in downloads.items() if v > 0}
pipeline_objects = filter_pipelines(downloads, PIPELINE_USAGE_CUTOFF)
return pipeline_objects
def fetch_pipeline_modules_to_test():
try:
pipeline_objects = fetch_pipeline_objects()
except Exception as e:
logger.error(e)
raise RuntimeError("Unable to fetch model list from HuggingFace Hub.")
test_modules = []
for pipeline_name in pipeline_objects:
module = getattr(diffusers, pipeline_name)
test_module = module.__module__.split(".")[-2].strip()
test_modules.append(test_module)
return test_modules
def main():
test_modules = fetch_pipeline_modules_to_test()
test_modules.extend(ALWAYS_TEST_PIPELINE_MODULES)
# Get unique modules
test_modules = list(set(test_modules))
print(json.dumps(test_modules))
save_path = f"{PATH_TO_REPO}/reports"
os.makedirs(save_path, exist_ok=True)
with open(f"{save_path}/test-pipelines.json", "w") as f:
json.dump({"pipeline_test_modules": test_modules}, f)
if __name__ == "__main__":
main()
|