File size: 13,870 Bytes
ed7a497
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A script to add and/or update the attribute `pipeline_model_mapping` in model test files.

This script will be (mostly) used in the following 2 situations:

  - run within a (scheduled) CI job to:
    - check if model test files in the library have updated `pipeline_model_mapping`,
    - and/or update test files and (possibly) open a GitHub pull request automatically
  - being run by a `transformers` member to quickly check and update some particular test file(s)

This script is **NOT** intended to be run (manually) by community contributors.
"""


import argparse
import glob
import inspect
import os
import re
import unittest

from get_test_info import get_test_classes

from tests.test_pipeline_mixin import pipeline_test_mapping


PIPELINE_TEST_MAPPING = {}
for task, _ in pipeline_test_mapping.items():
    PIPELINE_TEST_MAPPING[task] = {"pt": None, "tf": None}


# DO **NOT** add item to this set (unless the reason is approved)
TEST_FILE_TO_IGNORE = {
    "tests/models/esm/test_modeling_esmfold.py",  # The pipeline test mapping is added to `test_modeling_esm.py`
}


def get_framework(test_class):
    """Infer the framework from the test class `test_class`."""

    if "ModelTesterMixin" in [x.__name__ for x in test_class.__bases__]:
        return "pt"
    elif "TFModelTesterMixin" in [x.__name__ for x in test_class.__bases__]:
        return "tf"
    elif "FlaxModelTesterMixin" in [x.__name__ for x in test_class.__bases__]:
        return "flax"
    else:
        return None


def get_mapping_for_task(task, framework):
    """Get mappings defined in `XXXPipelineTests` for the task `task`."""
    # Use the cached results
    if PIPELINE_TEST_MAPPING[task].get(framework, None) is not None:
        return PIPELINE_TEST_MAPPING[task][framework]

    pipeline_test_class = pipeline_test_mapping[task]["test"]
    mapping = None

    if framework == "pt":
        mapping = getattr(pipeline_test_class, "model_mapping", None)
    elif framework == "tf":
        mapping = getattr(pipeline_test_class, "tf_model_mapping", None)

    if mapping is not None:
        mapping = dict(mapping.items())

    # cache the results
    PIPELINE_TEST_MAPPING[task][framework] = mapping
    return mapping


def get_model_for_pipeline_test(test_class, task):
    """Get the model architecture(s) related to the test class `test_class` for a pipeline `task`."""
    framework = get_framework(test_class)
    if framework is None:
        return None
    mapping = get_mapping_for_task(task, framework)
    if mapping is None:
        return None

    config_classes = list({model_class.config_class for model_class in test_class.all_model_classes})
    if len(config_classes) != 1:
        raise ValueError("There should be exactly one configuration class from `test_class.all_model_classes`.")

    # This could be a list/tuple of model classes, but it's rare.
    model_class = mapping.get(config_classes[0], None)
    if isinstance(model_class, (tuple, list)):
        model_class = sorted(model_class, key=lambda x: x.__name__)

    return model_class


def get_pipeline_model_mapping(test_class):
    """Get `pipeline_model_mapping` for `test_class`."""
    mapping = [(task, get_model_for_pipeline_test(test_class, task)) for task in pipeline_test_mapping]
    mapping = sorted([(task, model) for task, model in mapping if model is not None], key=lambda x: x[0])

    return dict(mapping)


def get_pipeline_model_mapping_string(test_class):
    """Get `pipeline_model_mapping` for `test_class` as a string (to be added to the test file).

    This will be a 1-line string. After this is added to a test file, `make style` will format it beautifully.
    """
    framework = get_framework(test_class)
    if framework == "pt":
        framework = "torch"
    default_value = "{}"

    mapping = get_pipeline_model_mapping(test_class)
    if len(mapping) == 0:
        return ""

    texts = []
    for task, model_classes in mapping.items():
        if isinstance(model_classes, (tuple, list)):
            # A list/tuple of model classes
            value = "(" + ", ".join([x.__name__ for x in model_classes]) + ")"
        else:
            # A single model class
            value = model_classes.__name__
        texts.append(f'"{task}": {value}')
    text = "{" + ", ".join(texts) + "}"
    text = f"pipeline_model_mapping = {text} if is_{framework}_available() else {default_value}"

    return text


def is_valid_test_class(test_class):
    """Restrict to `XXXModelTesterMixin` and should be a subclass of `unittest.TestCase`."""
    base_class_names = {"ModelTesterMixin", "TFModelTesterMixin", "FlaxModelTesterMixin"}
    if not issubclass(test_class, unittest.TestCase):
        return False
    return len(base_class_names.intersection([x.__name__ for x in test_class.__bases__])) > 0


def find_test_class(test_file):
    """Find a test class in `test_file` to which we will add `pipeline_model_mapping`."""
    test_classes = [x for x in get_test_classes(test_file) if is_valid_test_class(x)]

    target_test_class = None
    for test_class in test_classes:
        # If a test class has defined `pipeline_model_mapping`, let's take it
        if getattr(test_class, "pipeline_model_mapping", None) is not None:
            target_test_class = test_class
            break
    # Take the test class with the shortest name (just a heuristic)
    if target_test_class is None and len(test_classes) > 0:
        target_test_class = sorted(test_classes, key=lambda x: (len(x.__name__), x.__name__))[0]

    return target_test_class


def find_block_ending(lines, start_idx, indent_level):
    end_idx = start_idx
    for idx, line in enumerate(lines[start_idx:]):
        indent = len(line) - len(line.lstrip())
        if idx == 0 or indent > indent_level or (indent == indent_level and line.strip() == ")"):
            end_idx = start_idx + idx
        elif idx > 0 and indent <= indent_level:
            # Outside the definition block of `pipeline_model_mapping`
            break

    return end_idx


def add_pipeline_model_mapping(test_class, overwrite=False):
    """Add `pipeline_model_mapping` to `test_class`."""
    if getattr(test_class, "pipeline_model_mapping", None) is not None:
        if not overwrite:
            return "", -1

    line_to_add = get_pipeline_model_mapping_string(test_class)
    if len(line_to_add) == 0:
        return "", -1
    line_to_add = line_to_add + "\n"

    # The code defined the class `test_class`
    class_lines, class_start_line_no = inspect.getsourcelines(test_class)
    # `inspect` gives the code for an object, including decorator(s) if any.
    # We (only) need the exact line of the class definition.
    for idx, line in enumerate(class_lines):
        if line.lstrip().startswith("class "):
            class_lines = class_lines[idx:]
            class_start_line_no += idx
            break
    class_end_line_no = class_start_line_no + len(class_lines) - 1

    # The index in `class_lines` that starts the definition of `all_model_classes`, `all_generative_model_classes` or
    # `pipeline_model_mapping`. This assumes they are defined in such order, and we take the start index of the last
    # block that appears in a `test_class`.
    start_idx = None
    # The indent level of the line at `class_lines[start_idx]` (if defined)
    indent_level = 0
    # To record if `pipeline_model_mapping` is found in `test_class`.
    def_line = None
    for idx, line in enumerate(class_lines):
        if line.strip().startswith("all_model_classes = "):
            indent_level = len(line) - len(line.lstrip())
            start_idx = idx
        elif line.strip().startswith("all_generative_model_classes = "):
            indent_level = len(line) - len(line.lstrip())
            start_idx = idx
        elif line.strip().startswith("pipeline_model_mapping = "):
            indent_level = len(line) - len(line.lstrip())
            start_idx = idx
            def_line = line
            break

    if start_idx is None:
        return "", -1
    # Find the ending index (inclusive) of the above found block.
    end_idx = find_block_ending(class_lines, start_idx, indent_level)

    # Extract `is_xxx_available()` from existing blocks: some models require specific libraries like `timm` and use
    # `is_timm_available()` instead of `is_torch_available()`.
    # Keep leading and trailing whitespaces
    r = re.compile(r"\s(is_\S+?_available\(\))\s")
    for line in class_lines[start_idx : end_idx + 1]:
        backend_condition = r.search(line)
        if backend_condition is not None:
            # replace the leading and trailing whitespaces to the space character " ".
            target = " " + backend_condition[0][1:-1] + " "
            line_to_add = r.sub(target, line_to_add)
            break

    if def_line is None:
        # `pipeline_model_mapping` is not defined. The target index is set to the ending index (inclusive) of
        # `all_model_classes` or `all_generative_model_classes`.
        target_idx = end_idx
    else:
        # `pipeline_model_mapping` is defined. The target index is set to be one **BEFORE** its start index.
        target_idx = start_idx - 1
        # mark the lines of the currently existing `pipeline_model_mapping` to be removed.
        for idx in range(start_idx, end_idx + 1):
            # These lines are going to be removed before writing to the test file.
            class_lines[idx] = None  # noqa

    # Make sure the test class is a subclass of `PipelineTesterMixin`.
    parent_classes = [x.__name__ for x in test_class.__bases__]
    if "PipelineTesterMixin" not in parent_classes:
        # Put `PipelineTesterMixin` just before `unittest.TestCase`
        _parent_classes = [x for x in parent_classes if x != "TestCase"] + ["PipelineTesterMixin"]
        if "TestCase" in parent_classes:
            # Here we **assume** the original string is always with `unittest.TestCase`.
            _parent_classes.append("unittest.TestCase")
        parent_classes = ", ".join(_parent_classes)
        for idx, line in enumerate(class_lines):
            # Find the ending of the declaration of `test_class`
            if line.strip().endswith("):"):
                # mark the lines of the declaration of `test_class` to be removed
                for _idx in range(idx + 1):
                    class_lines[_idx] = None  # noqa
                break
        # Add the new, one-line, class declaration for `test_class`
        class_lines[0] = f"class {test_class.__name__}({parent_classes}):\n"

    # Add indentation
    line_to_add = " " * indent_level + line_to_add
    # Insert `pipeline_model_mapping` to `class_lines`.
    # (The line at `target_idx` should be kept by definition!)
    class_lines = class_lines[: target_idx + 1] + [line_to_add] + class_lines[target_idx + 1 :]
    # Remove the lines that are marked to be removed
    class_lines = [x for x in class_lines if x is not None]

    # Move from test class to module (in order to write to the test file)
    module_lines = inspect.getsourcelines(inspect.getmodule(test_class))[0]
    # Be careful with the 1-off between line numbers and array indices
    module_lines = module_lines[: class_start_line_no - 1] + class_lines + module_lines[class_end_line_no:]
    code = "".join(module_lines)

    moddule_file = inspect.getsourcefile(test_class)
    with open(moddule_file, "w", encoding="UTF-8", newline="\n") as fp:
        fp.write(code)

    return line_to_add


def add_pipeline_model_mapping_to_test_file(test_file, overwrite=False):
    """Add `pipeline_model_mapping` to `test_file`."""
    test_class = find_test_class(test_file)
    if test_class:
        add_pipeline_model_mapping(test_class, overwrite=overwrite)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--test_file", type=str, help="A path to the test file, starting with the repository's `tests` directory."
    )
    parser.add_argument(
        "--all",
        action="store_true",
        help="If to check and modify all test files.",
    )
    parser.add_argument(
        "--overwrite",
        action="store_true",
        help="If to overwrite a test class if it has already defined `pipeline_model_mapping`.",
    )
    args = parser.parse_args()

    if not args.all and not args.test_file:
        raise ValueError("Please specify either `test_file` or pass `--all` to check/modify all test files.")
    elif args.all and args.test_file:
        raise ValueError("Only one of `--test_file` and `--all` could be specified.")

    test_files = []
    if args.test_file:
        test_files = [args.test_file]
    else:
        pattern = os.path.join("tests", "models", "**", "test_modeling_*.py")
        for test_file in glob.glob(pattern):
            # `Flax` is not concerned at this moment
            if not test_file.startswith("test_modeling_flax_"):
                test_files.append(test_file)

    for test_file in test_files:
        if test_file in TEST_FILE_TO_IGNORE:
            print(f"[SKIPPED] {test_file} is skipped as it is in `TEST_FILE_TO_IGNORE` in the file {__file__}.")
            continue
        add_pipeline_model_mapping_to_test_file(test_file, overwrite=args.overwrite)