Spaces:
Running
Running
| import json | |
| import re | |
| from typing import Optional, Dict, Any, Union, List, Tuple | |
| from pydantic import BaseModel, Field, validator | |
| from huggingface_hub import InferenceClient | |
| from huggingface_hub.errors import HfHubHTTPError | |
| from variables import * | |
| from metaprompt_router import metaprompt_router | |
| class LLMResponse(BaseModel): | |
| initial_prompt_evaluation: str = Field(..., description="Evaluation of the initial prompt") | |
| refined_prompt: str = Field(..., description="The refined version of the prompt") | |
| explanation_of_refinements: Union[str, List[str]] = Field(..., description="Explanation of the refinements made") | |
| response_content: Optional[Union[Dict[str, Any], str]] = Field(None, description="Raw response content") | |
| def validate_response_content(cls, v): | |
| if isinstance(v, str): | |
| try: | |
| return json.loads(v) | |
| except json.JSONDecodeError: | |
| return {"raw_content": v} | |
| return v | |
| def clean_text_fields(cls, v): | |
| if isinstance(v, str): | |
| return v.strip().replace('\\n', '\n').replace('\\"', '"') | |
| return v | |
| def clean_refinements(cls, v): | |
| if isinstance(v, str): | |
| return v.strip().replace('\\n', '\n').replace('\\"', '"') | |
| elif isinstance(v, list): | |
| return [item.strip().replace('\\n', '\n').replace('\\"', '"').replace('•', '-') | |
| for item in v if isinstance(item, str)] | |
| return v | |
| class PromptRefiner: | |
| def __init__(self, api_token: str, meta_prompts: dict,metaprompt_explanations: dict): | |
| self.client = InferenceClient(token=api_token, timeout=120) | |
| self.meta_prompts = meta_prompts | |
| self.metaprompt_explanations=metaprompt_explanations | |
| def _clean_json_string(self, content: str) -> str: | |
| """Clean and prepare JSON string for parsing.""" | |
| content = content.replace('•', '-') # Replace bullet points | |
| content = re.sub(r'\s+', ' ', content) # Normalize whitespace | |
| content = content.replace('\\"', '"') # Fix escaped quotes | |
| return content.strip() | |
| def _parse_response(self, response_content: str) -> dict: | |
| """Parse the LLM response with enhanced error handling.""" | |
| try: | |
| # Extract content between <json> tags | |
| json_match = re.search(r'<json>\s*(.*?)\s*</json>', response_content, re.DOTALL) | |
| if json_match: | |
| json_str = self._clean_json_string(json_match.group(1)) | |
| try: | |
| # Try parsing the cleaned JSON | |
| parsed_json = json.loads(json_str) | |
| if isinstance(parsed_json, str): | |
| parsed_json = json.loads(parsed_json) | |
| return { | |
| "initial_prompt_evaluation": parsed_json.get("initial_prompt_evaluation", ""), | |
| "refined_prompt": parsed_json.get("refined_prompt", ""), | |
| "explanation_of_refinements": parsed_json.get("explanation_of_refinements", ""), | |
| "response_content": parsed_json if isinstance(parsed_json, dict) else {"raw_content": parsed_json} | |
| } | |
| except json.JSONDecodeError: | |
| # If JSON parsing fails, try regex parsing | |
| return self._parse_with_regex(json_str) | |
| # If no JSON tags found, try regex parsing | |
| return self._parse_with_regex(response_content) | |
| except Exception as e: | |
| print(f"Error parsing response: {str(e)}") | |
| print(f"Raw content: {response_content}") | |
| return self._create_error_dict(str(e)) | |
| def _parse_with_regex(self, content: str) -> dict: | |
| """Parse content using regex when JSON parsing fails.""" | |
| output = {} | |
| # Handle explanation_of_refinements list format | |
| refinements_match = re.search(r'"explanation_of_refinements":\s*$(.*?)$', content, re.DOTALL) | |
| if refinements_match: | |
| refinements_str = refinements_match.group(1) | |
| refinements = [ | |
| item.strip().strip('"').strip("'").replace('•', '-') | |
| for item in re.findall(r'[•"]([^"•]+)[•"]', refinements_str) | |
| ] | |
| output["explanation_of_refinements"] = refinements | |
| else: | |
| # Try single string format | |
| pattern = r'"explanation_of_refinements":\s*"(.*?)"(?:,|\})' | |
| match = re.search(pattern, content, re.DOTALL) | |
| output["explanation_of_refinements"] = match.group(1).strip() if match else "" | |
| # Extract other fields | |
| for key in ["initial_prompt_evaluation", "refined_prompt"]: | |
| pattern = rf'"{key}":\s*"(.*?)"(?:,|\}})' | |
| match = re.search(pattern, content, re.DOTALL) | |
| output[key] = match.group(1).strip() if match else "" | |
| # Store the original content in a structured way | |
| output["response_content"] = {"raw_content": content} | |
| return output | |
| def _create_error_dict(self, error_message: str) -> dict: | |
| """Create a standardized error response dictionary.""" | |
| return { | |
| "initial_prompt_evaluation": f"Error parsing response: {error_message}", | |
| "refined_prompt": "", | |
| "explanation_of_refinements": "", | |
| "response_content": {"error": error_message} | |
| } | |
| def automatic_metaprompt(self, prompt: str, meta_prompt_choice: str) -> Tuple[str, str, str, dict]: | |
| """Automatically select and apply the most appropriate metaprompt for the given prompt.""" | |
| try: | |
| # First, use the router to determine the best metaprompt | |
| router_messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are an AI Prompt Selection Assistant that helps choose the most appropriate metaprompt based on the user's query." | |
| }, | |
| { | |
| "role": "user", | |
| "content": metaprompt_router.replace("[Insert initial prompt here]", prompt) | |
| } | |
| ] | |
| # Get router response | |
| router_response = self.client.chat_completion( | |
| model=prompt_refiner_model, | |
| messages=router_messages, | |
| max_tokens=3000, | |
| temperature=0.2 | |
| ) | |
| router_content = router_response.choices[0].message.content.strip() | |
| # Extract JSON from router response | |
| json_match = re.search(r'<json>(.*?)</json>', router_content, re.DOTALL) | |
| if not json_match: | |
| raise ValueError("No JSON found in router response") | |
| router_result = json.loads(json_match.group(1)) | |
| # Get the recommended metaprompt key | |
| recommended_key = router_result["recommended_metaprompt"]["key"] | |
| # Use the recommended metaprompt to refine the prompt | |
| selected_meta_prompt = self.meta_prompts.get(recommended_key) | |
| selected_meta_prompt_explanations = self.metaprompt_explanations.get(recommended_key) | |
| # Now use the selected metaprompt to refine the original prompt | |
| refine_messages = [ | |
| { | |
| "role": "system", | |
| "content": 'You are an expert at refining and extending prompts. Given a basic prompt, provide a more relevant and detailed prompt.' | |
| }, | |
| { | |
| "role": "user", | |
| "content": selected_meta_prompt.replace("[Insert initial prompt here]", prompt) | |
| } | |
| ] | |
| response = self.client.chat_completion( | |
| model=prompt_refiner_model, | |
| messages=refine_messages, | |
| max_tokens=3000, | |
| temperature=0.8 | |
| ) | |
| response_content = response.choices[0].message.content.strip() | |
| result = self._parse_response(response_content) | |
| try: | |
| llm_response = LLMResponse(**result) | |
| metaprompt_analysis = f""" | |
| #### Selected MetaPrompt Analysis | |
| - <span style="color: grey; font-style: italic;">**Primary Choice**</span>: *{router_result["recommended_metaprompt"]["name"]}* | |
| - *Description*: *{router_result["recommended_metaprompt"]["description"]}* | |
| - *Why This Choice*: *{router_result["recommended_metaprompt"]["explanation"]}* | |
| #### Alternative Option | |
| - <span style="color: grey; font-style: italic;">**Secondary Choice**</span>: *{router_result["alternative_recommendation"]["name"]}* | |
| - *Why Consider This*: *{router_result["alternative_recommendation"]["explanation"]}* | |
| """ | |
| return ( | |
| metaprompt_analysis, | |
| llm_response.initial_prompt_evaluation, | |
| llm_response.refined_prompt, | |
| llm_response.explanation_of_refinements, | |
| llm_response.dict() | |
| ) | |
| except Exception as e: | |
| print(f"Error creating LLMResponse: {e}") | |
| return self._create_error_response(f"Error validating response: {str(e)}") | |
| except HfHubHTTPError as e: | |
| return self._create_error_response("Model timeout. Please try again later.") | |
| except Exception as e: | |
| return self._create_error_response(f"Unexpected error: {str(e)}") | |
| def refine_prompt(self, prompt: str, meta_prompt_choice: str) -> Tuple[str, str, str, dict]: | |
| """Refine the given prompt using the selected meta prompt.""" | |
| try: | |
| selected_meta_prompt = self.meta_prompts.get(meta_prompt_choice) | |
| selected_meta_prompt_explanations = self.metaprompt_explanations.get(meta_prompt_choice) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": 'You are an expert at refining and extending prompts. Given a basic prompt, provide a more relevant and detailed prompt.' | |
| }, | |
| { | |
| "role": "user", | |
| "content": selected_meta_prompt.replace("[Insert initial prompt here]", prompt) | |
| } | |
| ] | |
| response = self.client.chat_completion( | |
| model=prompt_refiner_model, | |
| messages=messages, | |
| max_tokens=3000, | |
| temperature=0.8 | |
| ) | |
| response_content = response.choices[0].message.content.strip() | |
| result = self._parse_response(response_content) | |
| try: | |
| llm_response = LLMResponse(**result) | |
| return ( | |
| f"- **{meta_prompt_choice}**: {selected_meta_prompt_explanations}", | |
| llm_response.initial_prompt_evaluation, | |
| llm_response.refined_prompt, | |
| llm_response.explanation_of_refinements, | |
| llm_response.dict() | |
| ) | |
| except Exception as e: | |
| print(f"Error creating LLMResponse: {e}") | |
| return self._create_error_response(f"Error validating response: {str(e)}") | |
| except HfHubHTTPError as e: | |
| return self._create_error_response("Model timeout. Please try again later.") | |
| except Exception as e: | |
| return self._create_error_response(f"Unexpected error: {str(e)}") | |
| def _create_error_response(self, error_message: str) -> Tuple[str, str, str, dict]: | |
| """Create a standardized error response tuple.""" | |
| return ( | |
| f"Error: {error_message}", | |
| f"Error: {error_message}", | |
| "The selected model is currently unavailable.", | |
| "An error occurred during processing.", | |
| {"error": error_message} | |
| ) | |
| def apply_prompt(self, prompt: str, model: str) -> str: | |
| """Apply formatting to the prompt using the specified model.""" | |
| try: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """You are a markdown formatting expert. Format your responses with proper spacing and structure following these rules: | |
| 1. Paragraph Spacing: | |
| - Add TWO blank lines between major sections (##) | |
| - Add ONE blank line between subsections (###) | |
| - Add ONE blank line between paragraphs within sections | |
| - Add ONE blank line before and after lists | |
| - Add ONE blank line before and after code blocks | |
| - Add ONE blank line before and after blockquotes | |
| 2. Section Formatting: | |
| # Title | |
| ## Major Section | |
| [blank line] | |
| Content paragraph 1 | |
| [blank line] | |
| Content paragraph 2 | |
| [blank line]""" | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| response = self.client.chat_completion( | |
| model=model, | |
| messages=messages, | |
| max_tokens=3000, | |
| temperature=0.8, | |
| stream=True | |
| ) | |
| full_response = "" | |
| for chunk in response: | |
| if chunk.choices[0].delta.content is not None: | |
| full_response += chunk.choices[0].delta.content | |
| return full_response.replace('\n\n', '\n').strip() | |
| except Exception as e: | |
| return f"Error: {str(e)}" |