|
import pipes |
|
|
|
class ModelManager: |
|
def __init__(self): |
|
self.models = {} |
|
|
|
def list_models(self): |
|
return list(self.models.keys()) |
|
|
|
def add_model(self, pipe_func, model_name, args): |
|
self.models[model_name] = {"pipeline": pipe_func, "args": args} |
|
|
|
def load_transformers_model(self, model_name, args): |
|
if hasattr(pipes, model_name): |
|
pipe_func = getattr(pipes, model_name) |
|
self.add_model(pipe_func, model_name, args) |
|
else: |
|
print(f"Error: {model_name} no está definido en el módulo pipes.") |
|
|
|
def train_transformers_model(self, model_name, train_dataset, eval_dataset, training_args): |
|
if model_name not in self.models: |
|
print(f"Error: {model_name} no está en la lista de modelos disponibles.") |
|
return |
|
|
|
pipeline = self.models[model_name]["pipeline"] |
|
pipeline.train(train_dataset=train_dataset, eval_dataset=eval_dataset, training_args=training_args) |
|
|
|
def test_model(self, model_name, test_dataset): |
|
if model_name not in self.models: |
|
print(f"Error: {model_name} no está en la lista de modelos disponibles.") |
|
return |
|
|
|
pipeline = self.models[model_name]["pipeline"] |
|
return pipeline.test(test_dataset) |
|
|
|
def remove_model(self, model_name): |
|
if model_name in self.models: |
|
del self.models[model_name] |
|
else: |
|
print(f"Error: {model_name} no está en la lista de modelos disponibles.") |
|
|
|
def execute_model(self, model_name, *args, **kwargs): |
|
if model_name not in self.models: |
|
print(f"Error: {model_name} no está en la lista de modelos disponibles.") |
|
return None |
|
|
|
pipe_func = self.models[model_name]["pipeline"] |
|
args = self.models[model_name]["args"] |
|
return pipe_func(*args, **kwargs) |
|
|
|
def choose_best_pipeline(self, prompt, task): |
|
available_pipelines = self.models.keys() |
|
best_pipeline = None |
|
best_score = float('-inf') |
|
|
|
for pipeline_name in available_pipelines: |
|
pipeline = self.models[pipeline_name]["pipeline"] |
|
score = self.evaluate_pipeline(pipeline, prompt, task) |
|
if score > best_score: |
|
best_score = score |
|
best_pipeline = pipeline_name |
|
|
|
return best_pipeline |
|
|
|
def evaluate_pipeline(self, pipeline, prompt, task): |
|
|
|
|
|
if task == "sentiment_analysis": |
|
|
|
test_dataset = [("Texto de prueba 1", "positivo"), ("Texto de prueba 2", "negativo")] |
|
correct_predictions = 0 |
|
total_predictions = len(test_dataset) |
|
|
|
for text, label in test_dataset: |
|
prediction = pipeline(text) |
|
if prediction == label: |
|
correct_predictions += 1 |
|
|
|
accuracy = correct_predictions / total_predictions |
|
return accuracy |
|
else: |
|
|
|
return 0.5 |
|
|
|
|
|
if __name__ == "__main__": |
|
manager = ModelManager() |
|
|
|
|
|
manager.load_transformers_model("sentiment_tags", args={}) |
|
manager.load_transformers_model("entity_pos_tagger", args={}) |
|
|
|
|
|
prompt = "Este es un texto de ejemplo para analizar el sentimiento." |
|
task = "sentiment_analysis" |
|
best_pipeline = manager.choose_best_pipeline(prompt, task) |
|
print(f"La mejor pipa para {task} es: {best_pipeline}") |
|
|
|
|