# database.py # database.py import logging import os from azure.cosmos import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError from pymongo import MongoClient import certifi from datetime import datetime import io import base64 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Variables globales para Cosmos DB SQL API cosmos_client = None user_database = None user_container = None # Variables globales para Cosmos DB MongoDB API mongo_client = None mongo_db = None analysis_collection = None #####################################################################################33 def initialize_cosmos_sql_connection(): global cosmos_client, user_database, user_container try: cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT") cosmos_key = os.environ.get("COSMOS_KEY") if not cosmos_endpoint or not cosmos_key: raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas") cosmos_client = CosmosClient(cosmos_endpoint, cosmos_key) user_database = cosmos_client.get_database_client("user_database") user_container = user_database.get_container_client("users") logger.info("Conexión a Cosmos DB SQL API exitosa") return True except Exception as e: logger.error(f"Error al conectar con Cosmos DB SQL API: {str(e)}") return False ############################################################################################3 def initialize_mongodb_connection(): global mongo_client, mongo_db, analysis_collection try: cosmos_mongodb_connection_string = os.getenv("MONGODB_CONNECTION_STRING") if not cosmos_mongodb_connection_string: logger.error("La variable de entorno MONGODB_CONNECTION_STRING no está configurada") return False mongo_client = MongoClient(cosmos_mongodb_connection_string, tls=True, tlsCAFile=certifi.where(), retryWrites=False, serverSelectionTimeoutMS=5000, connectTimeoutMS=10000, socketTimeoutMS=10000) mongo_client.admin.command('ping') mongo_db = mongo_client['aideatext_db'] analysis_collection = mongo_db['text_analysis'] logger.info("Conexión a Cosmos DB MongoDB API exitosa") return True except Exception as e: logger.error(f"Error al conectar con Cosmos DB MongoDB API: {str(e)}", exc_info=True) return False ####################################################################################################### # Funciones para Cosmos DB SQL API (manejo de usuarios) def get_user(username): try: query = f"SELECT * FROM c WHERE c.id = '{username}'" items = list(user_container.query_items(query=query, enable_cross_partition_query=True)) return items[0] if items else None except Exception as e: logger.error(f"Error al obtener usuario {username}: {str(e)}") return None def create_user(user_data): try: user_container.create_item(body=user_data) return True except Exception as e: logger.error(f"Error al crear usuario: {str(e)}") return False ################################################################################ # Funciones para Cosmos DB MongoDB API (análisis de texto) def get_student_data(username): if analysis_collection is None: logger.error("La conexión a MongoDB no está inicializada") return None try: logger.info(f"Buscando datos para el usuario: {username}") # Obtener todos los documentos para el usuario sin ordenar cursor = analysis_collection.find({"username": username}) # Contar documentos count = analysis_collection.count_documents({"username": username}) logger.info(f"Número de documentos encontrados para {username}: {count}") if count == 0: logger.info(f"No se encontraron datos para el usuario {username}") return None # Formatear los datos formatted_data = { "username": username, "entries": [], "entries_count": count, "word_count": {} } for entry in cursor: formatted_entry = { "timestamp": entry["timestamp"], "text": entry["text"], "word_count": entry.get("word_count", {}), "arc_diagrams": entry.get("arc_diagrams", []), "network_diagram": entry.get("network_diagram", "") } formatted_data["entries"].append(formatted_entry) # Agregar conteo de palabras for category, count in formatted_entry["word_count"].items(): if category in formatted_data["word_count"]: formatted_data["word_count"][category] += count else: formatted_data["word_count"][category] = count # Ordenar las entradas por timestamp después de obtenerlas formatted_data["entries"].sort(key=lambda x: x["timestamp"], reverse=True) # Convertir los timestamps a formato ISO después de ordenar for entry in formatted_data["entries"]: entry["timestamp"] = entry["timestamp"].isoformat() logger.info(f"Datos formateados para {username}: {formatted_data}") return formatted_data except Exception as e: logger.error(f"Error al obtener datos del estudiante {username}: {str(e)}") return None ####################################################################################################### def store_analysis_result(username, text, repeated_words, arc_diagrams, network_diagram): if analysis_collection is None: logger.error("La conexión a MongoDB no está inicializada") return False try: buffer = io.BytesIO() network_diagram.savefig(buffer, format='png') buffer.seek(0) network_diagram_base64 = base64.b64encode(buffer.getvalue()).decode() word_count = {} for word, color in repeated_words.items(): category = color # Asumiendo que 'color' es la categoría gramatical word_count[category] = word_count.get(category, 0) + 1 analysis_document = { 'username': username, 'timestamp': datetime.utcnow(), 'text': text, 'word_count': word_count, 'arc_diagrams': arc_diagrams, 'network_diagram': network_diagram_base64 } result = analysis_collection.insert_one(analysis_document) logger.info(f"Análisis guardado con ID: {result.inserted_id} para el usuario: {username}") return True except Exception as e: logger.error(f"Error al guardar el análisis para el usuario {username}: {str(e)}") return False