import json
import re
from typing import Optional, Dict, Any, Union, List, Tuple
from pydantic import BaseModel, Field, validator
from huggingface_hub import InferenceClient
from huggingface_hub.errors import HfHubHTTPError
from variables import *
from metaprompt_router import metaprompt_router
class LLMResponse(BaseModel):
initial_prompt_evaluation: str = Field(..., description="Evaluation of the initial prompt")
refined_prompt: str = Field(..., description="The refined version of the prompt")
explanation_of_refinements: Union[str, List[str]] = Field(..., description="Explanation of the refinements made")
response_content: Optional[Union[Dict[str, Any], str]] = Field(None, description="Raw response content")
@validator('response_content', pre=True)
def validate_response_content(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except json.JSONDecodeError:
return {"raw_content": v}
return v
@validator('initial_prompt_evaluation', 'refined_prompt', 'explanation_of_refinements')
def clean_text_fields(cls, v):
if isinstance(v, str):
return v.strip().replace('\\n', '\n').replace('\\"', '"')
elif isinstance(v, list):
return [item.strip().replace('\\n', '\n').replace('\\"', '"').replace('•', '-')
for item in v if isinstance(item, str)]
return v
class PromptRefiner:
def __init__(self, api_token: str, meta_prompts: dict, metaprompt_explanations: dict):
self.client = InferenceClient(token=api_token, timeout=120)
self.meta_prompts = meta_prompts
self.metaprompt_explanations = metaprompt_explanations
def _clean_json_string(self, content: str) -> str:
"""Clean and prepare JSON string for parsing."""
content = content.replace('•', '-')
content = re.sub(r'\s+', ' ', content)
content = content.replace('\\"', '"')
return content.strip()
def _parse_response(self, response_content: str) -> dict:
"""Parse the LLM response with enhanced error handling."""
try:
json_match = re.search(r'\s*(.*?)\s*', response_content, re.DOTALL)
if json_match:
json_str = self._clean_json_string(json_match.group(1))
try:
parsed_json = json.loads(json_str)
if isinstance(parsed_json, str):
parsed_json = json.loads(parsed_json)
prompt_analysis = f"""
#### Original prompt analysis
- {parsed_json.get("initial_prompt_evaluation", "")}
"""
explanation_of_refinements=f"""
#### Refinement Explanation
- {parsed_json.get("explanation_of_refinements", "")}
"""
return {
"initial_prompt_evaluation": prompt_analysis,
"refined_prompt": parsed_json.get("refined_prompt", ""),
"explanation_of_refinements": explanation_of_refinements,
"response_content": parsed_json
}
except json.JSONDecodeError:
return self._parse_with_regex(json_str)
return self._parse_with_regex(response_content)
except Exception as e:
print(f"Error parsing response: {str(e)}")
return self._create_error_dict(str(e))
def _parse_with_regex(self, content: str) -> dict:
"""Parse content using regex when JSON parsing fails."""
output = {}
refinements_match = re.search(r'"explanation_of_refinements":\s*$(.*?)$', content, re.DOTALL)
if refinements_match:
refinements_str = refinements_match.group(1)
refinements = [
item.strip().strip('"').strip("'").replace('•', '-')
for item in re.findall(r'[•"]([^"•]+)[•"]', refinements_str)
]
output["explanation_of_refinements"] = refinements
else:
pattern = r'"explanation_of_refinements":\s*"(.*?)"(?:,|\})'
match = re.search(pattern, content, re.DOTALL)
output["explanation_of_refinements"] = match.group(1).strip() if match else ""
for key in ["initial_prompt_evaluation", "refined_prompt"]:
pattern = rf'"{key}":\s*"(.*?)"(?:,|\}})'
match = re.search(pattern, content, re.DOTALL)
output[key] = match.group(1).strip() if match else ""
output["response_content"] = {"raw_content": content}
return output
def _create_error_dict(self, error_message: str) -> dict:
"""Create a standardized error response dictionary."""
return {
"initial_prompt_evaluation": f"Error parsing response: {error_message}",
"refined_prompt": "",
"explanation_of_refinements": "",
"response_content": {"error": error_message}
}
def automatic_metaprompt(self, prompt: str) -> Tuple[str, str]:
"""Automatically select the most appropriate metaprompt."""
try:
router_messages = [
{
"role": "system",
"content": "You are an AI Prompt Selection Assistant that helps choose the most appropriate metaprompt based on the user's query."
},
{
"role": "user",
"content": metaprompt_router.replace("[Insert initial prompt here]", prompt)
}
]
router_response = self.client.chat_completion(
model=prompt_refiner_model,
messages=router_messages,
max_tokens=3000,
temperature=0.2
)
router_content = router_response.choices[0].message.content.strip()
json_match = re.search(r'(.*?)', router_content, re.DOTALL)
if not json_match:
raise ValueError("No JSON found in router response")
router_result = json.loads(json_match.group(1))
recommended_key = router_result["recommended_metaprompt"]["key"]
metaprompt_analysis = f"""
#### Selected MetaPrompt
- **Primary Choice**: {router_result["recommended_metaprompt"]["name"]}
- *Description*: {router_result["recommended_metaprompt"]["description"]}
- *Why This Choice*: {router_result["recommended_metaprompt"]["explanation"]}
- *Similar Sample*: {router_result["recommended_metaprompt"]["similar_sample"]}
- *Customized Sample*: {router_result["recommended_metaprompt"]["customized_sample"]}
#### Alternative Option
- **Secondary Choice**: {router_result["alternative_recommendation"]["name"]}
- *Why Consider This*: {router_result["alternative_recommendation"]["explanation"]}
"""
return metaprompt_analysis, recommended_key
except Exception as e:
return f"Error in automatic metaprompt: {str(e)}", ""
def refine_prompt(self, prompt: str, meta_prompt_choice: str) -> Tuple[str, str, str, dict]:
"""Refine the given prompt using the selected meta prompt."""
try:
selected_meta_prompt = self.meta_prompts.get(meta_prompt_choice)
selected_meta_prompt_explanations = self.metaprompt_explanations.get(meta_prompt_choice)
messages = [
{
"role": "system",
"content": 'You are an expert at refining and extending prompts.'
},
{
"role": "user",
"content": selected_meta_prompt.replace("[Insert initial prompt here]", prompt)
}
]
response = self.client.chat_completion(
model=prompt_refiner_model,
messages=messages,
max_tokens=3000,
temperature=0.8
)
result = self._parse_response(response.choices[0].message.content.strip())
llm_response = LLMResponse(**result)
llm_response_dico={}
llm_response_dico['initial_prompt']=prompt
llm_response_dico['meta_prompt']=meta_prompt_choice
llm_response_dico=llm_response_dico | llm_response.dict()
return (
llm_response.initial_prompt_evaluation,
llm_response.refined_prompt,
llm_response.explanation_of_refinements,
llm_response_dico
)
except Exception as e:
return (
f"Error: {str(e)}",
"",
"",
{}
)
def apply_prompt(self, prompt: str, model: str) -> str:
"""Apply formatting to the prompt using the specified model."""
try:
if not prompt or not model:
return "Error: Prompt and model are required"
messages = [
{
"role": "system",
"content": "You are a markdown formatting expert."
},
{
"role": "user",
"content": prompt
}
]
response = self.client.chat_completion(
model=model,
messages=messages,
max_tokens=3000,
temperature=0.8,
stream=False # Mode non-stream
)
# Accès direct à la réponse puisque stream=False
result = response.choices[0].message.content.strip()
return f"""{result}"""
except Exception as e:
return f"Error: {str(e)}"