File size: 2,616 Bytes
43b7e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import json
import logging
import os
from collections import defaultdict
from pathlib import Path

from huggingface_hub import HfApi, ModelFilter

import diffusers


PATH_TO_REPO = Path(__file__).parent.parent.resolve()
ALWAYS_TEST_PIPELINE_MODULES = [
    "controlnet",
    "stable_diffusion",
    "stable_diffusion_2",
    "stable_diffusion_xl",
    "stable_diffusion_adapter",
    "deepfloyd_if",
    "ip_adapters",
    "kandinsky",
    "kandinsky2_2",
    "text_to_video_synthesis",
    "wuerstchen",
]
PIPELINE_USAGE_CUTOFF = int(os.getenv("PIPELINE_USAGE_CUTOFF", 50000))

logger = logging.getLogger(__name__)
api = HfApi()
filter = ModelFilter(library="diffusers")


def filter_pipelines(usage_dict, usage_cutoff=10000):
    output = []
    for diffusers_object, usage in usage_dict.items():
        if usage < usage_cutoff:
            continue

        is_diffusers_pipeline = hasattr(diffusers.pipelines, diffusers_object)
        if not is_diffusers_pipeline:
            continue

        output.append(diffusers_object)

    return output


def fetch_pipeline_objects():
    models = api.list_models(filter=filter)
    downloads = defaultdict(int)

    for model in models:
        is_counted = False
        for tag in model.tags:
            if tag.startswith("diffusers:"):
                is_counted = True
                downloads[tag[len("diffusers:") :]] += model.downloads

        if not is_counted:
            downloads["other"] += model.downloads

    # Remove 0 downloads
    downloads = {k: v for k, v in downloads.items() if v > 0}
    pipeline_objects = filter_pipelines(downloads, PIPELINE_USAGE_CUTOFF)

    return pipeline_objects


def fetch_pipeline_modules_to_test():
    try:
        pipeline_objects = fetch_pipeline_objects()
    except Exception as e:
        logger.error(e)
        raise RuntimeError("Unable to fetch model list from HuggingFace Hub.")

    test_modules = []
    for pipeline_name in pipeline_objects:
        module = getattr(diffusers, pipeline_name)

        test_module = module.__module__.split(".")[-2].strip()
        test_modules.append(test_module)

    return test_modules


def main():
    test_modules = fetch_pipeline_modules_to_test()
    test_modules.extend(ALWAYS_TEST_PIPELINE_MODULES)

    # Get unique modules
    test_modules = list(set(test_modules))
    print(json.dumps(test_modules))

    save_path = f"{PATH_TO_REPO}/reports"
    os.makedirs(save_path, exist_ok=True)

    with open(f"{save_path}/test-pipelines.json", "w") as f:
        json.dump({"pipeline_test_modules": test_modules}, f)


if __name__ == "__main__":
    main()