File size: 7,328 Bytes
d5436e0
4e8158d
d5436e0
 
 
 
e294914
4e8158d
e294914
 
232e081
d5436e0
e294914
d5436e0
 
 
 
 
 
 
 
 
ffd9848
e294914
ccc21ac
d5436e0
fa98157
e294914
d5436e0
ffd9848
d5436e0
 
 
ffd9848
d5436e0
 
5cdab92
c59651a
5cdab92
 
 
d5436e0
5cdab92
d5436e0
 
e294914
 
 
 
 
 
 
 
 
d5436e0
 
 
e294914
d5436e0
e294914
 
 
d5436e0
 
 
2d03ac2
d5436e0
2d03ac2
d5436e0
 
 
 
 
e294914
d5436e0
 
e294914
d5436e0
 
 
ffd9848
 
d5436e0
 
ffd9848
d5436e0
 
 
e294914
 
 
d5436e0
 
e294914
d5436e0
 
e294914
 
 
d5436e0
fa98157
e294914
d5436e0
e294914
d5436e0
 
 
ffd9848
d5436e0
 
5cdab92
e294914
c59651a
5cdab92
e294914
5cdab92
d5436e0
5cdab92
d5436e0
 
e294914
 
 
 
 
 
 
 
 
 
d5436e0
 
 
e294914
d5436e0
e294914
 
 
d5436e0
 
 
2d03ac2
d5436e0
2d03ac2
d5436e0
 
 
 
 
e294914
d5436e0
2d03ac2
e294914
d5436e0
 
 
ffd9848
 
d5436e0
 
ffd9848
d5436e0
 
 
e294914
 
 
d5436e0
 
e294914
d5436e0
 
e294914
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os
import logging
from abc import ABC

import requests

from langchain_community.llms import LlamaCpp

from llm.utils.config import config
from llm.utils.lc_interface import LCInterface

logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)  # because if something went wrong in execution application can't be work anymore

file_handler = logging.FileHandler(
    "logs/chelsea_llm_llamacpp.log")  # for all modules template for logs file is "logs/chelsea_{module_name}_{dir_name}.log"
logger.setLevel(logging.INFO)  # informed

formatted = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatted)
logger.addHandler(file_handler)

work_dir = os.getcwd()
models_dir = os.path.join(work_dir, "llm/models")
    

class LC_TinyLlama(LCInterface, ABC):
    def __init__(self):
        self.model_config = config["LC_TinyLlama-1.1B-Chat-v1.0-GGUF"]
        
        try:
            get_file = requests.get(self.model_config["model_url"])
            if get_file.status_code == 200:
                path_to_model = os.path.join(models_dir, self.model_config["model_name"])
                with open(path_to_model, "wb") as f:
                    f.write(get_file.content)
                    logger.info("Model file successfully recorded")
                f.close()
        except FileExistsError:
            print(f"Model file {path_to_model} already exists. Skipping download.")
            logger.info(f"Model file {path_to_model} already exists. Skipping download.")
        except OSError as e:
            print(f"Error while writing a file to directory : {e}")
            logger.error(msg="Error while write a file to directory", exc_info=e)

        self.llm = LlamaCpp(
            model_path=os.path.join(models_dir, self.model_config["model_name"]),
            temperature=self.model_config["temperature"],
            max_tokens=self.model_config["max_tokens"],
            top_p=self.model_config["top_p"],
            top_k=self.model_config["top_k"],
            # callback_manager=callback_manager,
            verbose=True,  # Verbose is required to pass to the callback manager
        )

    def execution(self):
        try:
            return self.llm
        except Exception as e:
            print(f"Execution filed in LC_TinyLlama execution function: {e}")
            logger.critical(msg="Execution filed in LC_TinyLlama execution function", exc_info=e)
            return None

    def clear_llm(self, unused_model_dict, current_lc):
        # If unused_model_dict is not empty
        if len(unused_model_dict) > 1 or unused_model_dict is not None:
            # go through key and value
            for key, value in unused_model_dict.items():
                # check if path is existing and key is not current using model
                if os.path.exists(value) and key != current_lc:
                    # delete files from models directory except of current_lc
                    os.remove(value)
                    logger.info(f"Successfully deleted file {value}")
                    print(f"Successfully deleted file {value}")
        else:
            logger.info(f"Unfortunately dictionary empty or None")
            print(f"Unfortunately dictionary {unused_model_dict} empty or None")

    def get_unused(self, current_lc):

        if len(os.listdir(models_dir)) > 1:
            file_names = [os.path.basename(md) for md in os.listdir(models_dir)]
            for item in file_names:
                if item != current_lc:
                    unused_model_file = os.path.join(models_dir, item)
                    return {item: unused_model_file}
        else:
            return None
    
    def model_name(self):
        return self.model_config["model_name"]

    def __str__(self):
        return f"{self.__class__.__name__}_{self.model_name()}"

    def __repr__(self):
        llm_info = f"llm={self.llm}" if hasattr(self, 'llm') else 'llm=not initialized'
        return f"{self.__class__.__name__}({llm_info})"
    

class LC_Phi3(LCInterface, ABC):
    def __init__(self):
        self.model_config = config["LC_Phi-3-mini-4k-instruct-gguf"]
        
        try:
            get_file = requests.get(self.model_config["model_url"])
            if get_file.status_code == 200:
                path_to_model = os.path.join(models_dir, self.model_config["model_name"])
                with open(path_to_model, "wb") as f:
                    f.write(get_file.content)
                    logger.info("Model file successfully recorded")
                    print("Model file successfully recorded")
                f.close()
        except FileExistsError:
            print(f"Model file {path_to_model} already exists. Skipping download.")
            logger.info(f"Model file {path_to_model} already exists. Skipping download.")
        except OSError as e:
            print(f"Error while writing a file to directory : {e}")
            logger.error(msg="Error while write a file to directory", exc_info=e)

        self.llm = LlamaCpp(
            model_path=os.path.join(models_dir, self.model_config["model_name"]),
            temperature=self.model_config["temperature"],
            max_tokens=self.model_config["max_tokens"],
            top_p=self.model_config["top_p"],
            top_k=self.model_config["top_k"],
            # callback_manager=callback_manager,
            verbose=True,  # Verbose is required to pass to the callback manager
        )


    def execution(self):
        try:
            return self.llm
        except Exception as e:
            print(f"Execution filed in LC_Phi3 execution function: {e}")
            logger.critical(msg="Execution filed in LC_Phi3 execution function:", exc_info=e)
            return None

    def clear_llm(self, unused_model_dict, current_lc):
        # If unused_model_dict is not empty
        if len(unused_model_dict) > 1 or unused_model_dict is not None:
            # go through key and value
            for key, value in unused_model_dict.items():
                # check if path is existing and key is not current using model
                if os.path.exists(value) and key != current_lc:
                    # delete files from models directory except of current_lc
                    os.remove(value)
                    logger.info(f"Successfully deleted file {value}")
                    print(f"Successfully deleted file {value}")
        else:
            logger.info(f"Unfortunately dictionary empty or None")
            print(f"Unfortunately dictionary {unused_model_dict} empty or None")

    def get_unused(self, current_lc):

        if len(os.listdir(models_dir)) > 1:
            file_names = [os.path.basename(md) for md in os.listdir(models_dir)]
            for item in file_names:
                if item != current_lc:
                    unused_model_file = os.path.join(models_dir, item)
                    return {item: unused_model_file}
        else:
            return None
    
    def model_name(self):
        return self.model_config["model_name"]

    def __str__(self):
        return f"{self.__class__.__name__}_{self.model_name()}"

    def __repr__(self):
        llm_info = f"llm={self.llm}" if hasattr(self, 'llm') else 'llm=not initialized'
        return f"{self.__class__.__name__}({llm_info})"