cc-api / json_parser.py
Severian's picture
Update json_parser.py
a47d087 verified
from logger_config import setup_logger
from typing import Dict, Any, Optional, List, Union, Tuple
from dataclasses import dataclass
from enum import Enum
import json
import re
import traceback
logger = setup_logger()
class MessageState:
def __init__(self):
self.buffer = ""
self.is_complete = False
self.tool_outputs = []
self.citations = []
self.metadata = {}
self.processed_events = set()
self.current_message_id = None
class SSEParser:
def __init__(self):
self.logger = setup_logger("sse_parser")
self.current_message = MessageState()
def _extract_json_content(self, data: str) -> Optional[str]:
"""Extract JSON content from SSE data line"""
if "data:" in data:
return data.split("data:", 1)[1].strip()
return None
def _is_valid_json(self, content: str) -> bool:
"""Check if content is valid JSON"""
try:
json.loads(content)
return True
except json.JSONDecodeError:
return False
def _clean_mermaid_content(self, content: str) -> Optional[str]:
"""Clean and extract mermaid diagram content"""
try:
self.logger.debug(f"Starting mermaid content cleaning. Input type: {type(content)}")
self.logger.debug(f"Initial content: {content[:200]}...") # Log first 200 chars
# Handle tool output format
if isinstance(content, dict) and "tool_output" in content:
self.logger.debug("Found tool_output in dict, extracting...")
content = content["tool_output"]
self.logger.debug(f"Extracted tool_output: {content[:200]}...")
# Remove tool response prefix/suffix if present
if isinstance(content, str):
self.logger.debug("Processing string content...")
if "tool response:" in content:
self.logger.debug("Found 'tool response:' prefix, removing...")
content = content.split("tool response:", 1)[1].strip()
if content.endswith('.'):
content = content[:-1]
self.logger.debug(f"After prefix/suffix removal: {content[:200]}...")
# Parse JSON if present
try:
if isinstance(content, str):
self.logger.debug("Attempting to parse content as JSON...")
data = json.loads(content)
self.logger.debug(f"JSON parsed successfully. Keys: {data.keys()}")
else:
data = content
self.logger.debug(f"Using content as data directly. Type: {type(data)}")
# Handle different mermaid output formats
if "mermaid_output" in data:
self.logger.debug("Found mermaid_output format")
content = data["mermaid_output"]
elif "mermaid_diagram" in data:
self.logger.debug("Found mermaid_diagram format")
content = data["mermaid_diagram"]
# If content is still JSON string, parse again
if isinstance(content, str) and content.startswith('{'):
self.logger.debug("Content still appears to be JSON, attempting second parse...")
try:
data = json.loads(content)
if "mermaid_output" in data:
content = data["mermaid_output"]
self.logger.debug("Extracted mermaid_output from second parse")
elif "mermaid_diagram" in data:
content = data["mermaid_diagram"]
self.logger.debug("Extracted mermaid_diagram from second parse")
except Exception as e:
self.logger.debug(f"Second JSON parse failed: {str(e)}")
except json.JSONDecodeError as e:
self.logger.debug(f"Initial JSON parse failed: {str(e)}")
# Clean up markdown formatting
if isinstance(content, str):
self.logger.debug("Cleaning markdown formatting...")
content = content.replace("```mermaid\n", "").replace("\n```", "")
content = content.strip()
# Remove any remaining JSON artifacts
if content.startswith('{'):
self.logger.debug("Attempting to clean remaining JSON artifacts...")
try:
data = json.loads(content)
if isinstance(data, dict):
content = next(iter(data.values()))
self.logger.debug("Extracted value from remaining JSON")
except Exception as e:
self.logger.debug(f"Final JSON cleanup failed: {str(e)}")
self.logger.debug(f"Final cleaned content: {content[:200]}...")
return content
self.logger.warning("Content not in string format after processing")
return None
except Exception as e:
self.logger.error(f"Error cleaning mermaid content: {str(e)}")
self.logger.error(f"Original content: {content}")
self.logger.error(f"Stack trace: {traceback.format_exc()}")
return None
def parse_sse_event(self, data: str) -> Optional[Dict]:
"""Parse SSE event data and format for frontend consumption"""
try:
self.logger.debug(f"Parsing SSE event. Raw data length: {len(data)}")
# Clean up the data format - remove extra data: prefixes
data = data.replace('data: data:', 'data:').replace('\r\n', '\n')
# Extract JSON content from SSE data
json_content = self._extract_json_content(data)
if not json_content:
self.logger.debug("No JSON content found in SSE data")
return None
# Handle text-wrapped JSON
if json_content.startswith('{"text":'):
try:
wrapper = json.loads(json_content)
json_content = wrapper.get("text", "")
except:
pass
self.logger.debug(f"Cleaned JSON content: {json_content[:200]}...")
# Parse XML content if present
if '<agent_response>' in json_content:
return self._parse_xml_response(json_content)
# Parse JSON content
try:
parsed_data = json.loads(json_content)
self.logger.debug(f"Parsed data keys: {parsed_data.keys()}")
# Handle tool outputs
if any(key in parsed_data for key in ['mermaid_output', 'mermaid_diagram']):
return {
"type": "tool_output",
"tool": "mermaid",
"content": self._clean_mermaid_content(json_content)
}
return parsed_data
except json.JSONDecodeError:
self.logger.debug("Failed to parse as JSON, treating as raw content")
return {
"type": "message",
"content": json_content
}
except Exception as e:
self.logger.error(f"Parse error: {str(e)}")
self.logger.error(f"Raw data: {data}")
self.logger.error(f"Stack trace: {traceback.format_exc()}")
return None
def _parse_xml_response(self, content: str) -> Optional[Dict]:
"""Parse XML response format"""
try:
# Extract message content
message_match = re.search(r'<message>(.*?)</message>', content, re.DOTALL)
if message_match:
return {
"type": "message",
"content": message_match.group(1).strip()
}
# Extract tool output content
tool_match = re.search(r'<tool_output.*?>(.*?)</tool_output>', content, re.DOTALL)
if tool_match:
tool_content = tool_match.group(1)
if 'mermaid' in content.lower():
return {
"type": "tool_output",
"tool": "mermaid",
"content": self._clean_mermaid_content(tool_content)
}
return None
except Exception as e:
self.logger.error(f"XML parse error: {str(e)}")
self.logger.error(f"Content: {content}")
return None
def _process_observation(self, data: Dict) -> Dict:
"""Process observation content with special handling for tool outputs"""
try:
observation = data.get("observation")
if observation and isinstance(observation, str):
# Handle tool-specific content
if "mermaid_diagram" in observation:
cleaned_content = self.clean_mermaid_content(observation)
if cleaned_content not in [t.get("content") for t in self.current_message.tool_outputs]:
self.current_message.tool_outputs.append({
"type": "mermaid_diagram",
"content": cleaned_content
})
data["observation"] = json.dumps({
"mermaid_diagram": cleaned_content
})
elif self._is_valid_json(observation):
# Handle other tool outputs
try:
tool_data = json.loads(observation)
if isinstance(tool_data, dict):
for tool_name, tool_output in tool_data.items():
if tool_output not in [t.get("content") for t in self.current_message.tool_outputs]:
self.current_message.tool_outputs.append({
"type": tool_name,
"content": tool_output
})
except json.JSONDecodeError:
pass
except Exception as e:
self.logger.error(f"Error processing observation: {str(e)}")
return data
def _handle_message_end(self, data: Dict) -> None:
"""Handle message end event and cleanup state"""
self.current_message.citations = data.get("retriever_resources", [])
self.current_message.metadata = data.get("metadata", {})
self.current_message.metadata["tool_outputs"] = self.current_message.tool_outputs
self.current_message.is_complete = True
def clean_mermaid_content(self, content: str) -> str:
"""Clean and format mermaid diagram content"""
try:
# Remove markdown and JSON formatting
content = re.sub(r'```mermaid\s*|\s*```', '', content)
content = re.sub(r'tool response:.*?{', '{', content)
content = re.sub(r'}\s*\.$', '}', content)
# Parse JSON if present
if content.strip().startswith('{'):
try:
content_dict = json.loads(content)
if isinstance(content_dict, dict):
content = content_dict.get("mermaid_diagram", content)
except:
pass
return content.strip()
except Exception as e:
self.logger.error(f"Error cleaning mermaid content: {e}")
return content