File size: 3,874 Bytes
a164e13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright (C) 2024 Charles O. Goddard
#
# This software is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.

import functools
import typing
from typing import Any, Callable, Optional, Union

import click
from click.core import Context, Parameter
from pydantic import BaseModel

from mergekit.common import parse_kmb


class MergeOptions(BaseModel):
    allow_crimes: bool = False
    transformers_cache: Optional[str] = None
    lora_merge_cache: Optional[str] = None
    cuda: bool = False
    low_cpu_memory: bool = False
    out_shard_size: int = parse_kmb("5B")
    copy_tokenizer: bool = True
    clone_tensors: bool = False
    trust_remote_code: bool = False
    random_seed: Optional[int] = None
    lazy_unpickle: bool = False
    write_model_card: bool = True
    safe_serialization: bool = True


OPTION_HELP = {
    "allow_crimes": "Allow mixing architectures",
    "transformers_cache": "Override storage path for downloaded models",
    "lora_merge_cache": "Path to store merged LORA models",
    "cuda": "Perform matrix arithmetic on GPU",
    "low_cpu_memory": "Store results and intermediate values on GPU. Useful if VRAM > RAM",
    "out_shard_size": "Number of parameters per output shard  [default: 5B]",
    "copy_tokenizer": "Copy a tokenizer to the output",
    "clone_tensors": "Clone tensors before saving, to allow multiple occurrences of the same layer",
    "trust_remote_code": "Trust remote code from huggingface repos (danger)",
    "random_seed": "Seed for reproducible use of randomized merge methods",
    "lazy_unpickle": "Experimental lazy unpickler for lower memory usage",
    "write_model_card": "Output README.md containing details of the merge",
    "safe_serialization": "Save output in safetensors. Do this, don't poison the world with more pickled models.",
}


class ShardSizeParamType(click.ParamType):
    name = "size"

    def convert(
        self, value: Any, param: Optional[Parameter], ctx: Optional[Context]
    ) -> int:
        return parse_kmb(value)


def add_merge_options(f: Callable) -> Callable:
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        arg_dict = {}
        for field_name in MergeOptions.model_fields:
            if field_name in kwargs:
                arg_dict[field_name] = kwargs.pop(field_name)

        kwargs["merge_options"] = MergeOptions(**arg_dict)
        f(*args, **kwargs)

    for field_name, info in reversed(MergeOptions.model_fields.items()):
        origin = typing.get_origin(info.annotation)
        if origin is Union:
            ty, prob_none = typing.get_args(info.annotation)
            assert prob_none is type(None)
            field_type = ty
        else:
            field_type = info.annotation

        if field_name == "out_shard_size":
            field_type = ShardSizeParamType()

        arg_name = field_name.replace("_", "-")
        if field_type == bool:
            arg_str = f"--{arg_name}/--no-{arg_name}"
        else:
            arg_str = f"--{arg_name}"

        help_str = OPTION_HELP.get(field_name, None)
        wrapper = click.option(
            arg_str,
            type=field_type,
            default=info.default,
            help=help_str,
            show_default=field_name != "out_shard_size",
        )(wrapper)

    return wrapper