v2 / modules /database.py
AIdeaText's picture
Update modules/database.py
cba69cb verified
raw
history blame
No virus
7.19 kB
# database.py
# database.py
import logging
import os
from azure.cosmos import CosmosClient
from azure.cosmos.exceptions import CosmosHttpResponseError
from pymongo import MongoClient
import certifi
from datetime import datetime
import io
import base64
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Variables globales para Cosmos DB SQL API
cosmos_client = None
user_database = None
user_container = None
# Variables globales para Cosmos DB MongoDB API
mongo_client = None
mongo_db = None
analysis_collection = None
#####################################################################################33
def initialize_cosmos_sql_connection():
global cosmos_client, user_database, user_container
try:
cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT")
cosmos_key = os.environ.get("COSMOS_KEY")
if not cosmos_endpoint or not cosmos_key:
raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
cosmos_client = CosmosClient(cosmos_endpoint, cosmos_key)
user_database = cosmos_client.get_database_client("user_database")
user_container = user_database.get_container_client("users")
logger.info("Conexión a Cosmos DB SQL API exitosa")
return True
except Exception as e:
logger.error(f"Error al conectar con Cosmos DB SQL API: {str(e)}")
return False
############################################################################################3
def initialize_mongodb_connection():
global mongo_client, mongo_db, analysis_collection
try:
cosmos_mongodb_connection_string = os.getenv("MONGODB_CONNECTION_STRING")
if not cosmos_mongodb_connection_string:
logger.error("La variable de entorno MONGODB_CONNECTION_STRING no está configurada")
return False
mongo_client = MongoClient(cosmos_mongodb_connection_string,
tls=True,
tlsCAFile=certifi.where(),
retryWrites=False,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=10000)
mongo_client.admin.command('ping')
mongo_db = mongo_client['aideatext_db']
analysis_collection = mongo_db['text_analysis']
logger.info("Conexión a Cosmos DB MongoDB API exitosa")
return True
except Exception as e:
logger.error(f"Error al conectar con Cosmos DB MongoDB API: {str(e)}", exc_info=True)
return False
#######################################################################################################
# Funciones para Cosmos DB SQL API (manejo de usuarios)
def get_user(username):
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
items = list(user_container.query_items(query=query, enable_cross_partition_query=True))
return items[0] if items else None
except Exception as e:
logger.error(f"Error al obtener usuario {username}: {str(e)}")
return None
def create_user(user_data):
try:
user_container.create_item(body=user_data)
return True
except Exception as e:
logger.error(f"Error al crear usuario: {str(e)}")
return False
################################################################################
# Funciones para Cosmos DB MongoDB API (análisis de texto)
def get_student_data(username):
if analysis_collection is None:
logger.error("La conexión a MongoDB no está inicializada")
return None
try:
logger.info(f"Buscando datos para el usuario: {username}")
# Obtener todos los documentos para el usuario sin ordenar
cursor = analysis_collection.find({"username": username})
# Contar documentos
count = analysis_collection.count_documents({"username": username})
logger.info(f"Número de documentos encontrados para {username}: {count}")
if count == 0:
logger.info(f"No se encontraron datos para el usuario {username}")
return None
# Formatear los datos
formatted_data = {
"username": username,
"entries": [],
"entries_count": count,
"word_count": {}
}
for entry in cursor:
formatted_entry = {
"timestamp": entry["timestamp"],
"text": entry["text"],
"word_count": entry.get("word_count", {}),
"arc_diagrams": entry.get("arc_diagrams", []),
"network_diagram": entry.get("network_diagram", "")
}
formatted_data["entries"].append(formatted_entry)
# Agregar conteo de palabras
for category, count in formatted_entry["word_count"].items():
if category in formatted_data["word_count"]:
formatted_data["word_count"][category] += count
else:
formatted_data["word_count"][category] = count
# Ordenar las entradas por timestamp después de obtenerlas
formatted_data["entries"].sort(key=lambda x: x["timestamp"], reverse=True)
# Convertir los timestamps a formato ISO después de ordenar
for entry in formatted_data["entries"]:
entry["timestamp"] = entry["timestamp"].isoformat()
logger.info(f"Datos formateados para {username}: {formatted_data}")
return formatted_data
except Exception as e:
logger.error(f"Error al obtener datos del estudiante {username}: {str(e)}")
return None
#######################################################################################################
def store_analysis_result(username, text, repeated_words, arc_diagrams, network_diagram):
if analysis_collection is None:
logger.error("La conexión a MongoDB no está inicializada")
return False
try:
buffer = io.BytesIO()
network_diagram.savefig(buffer, format='png')
buffer.seek(0)
network_diagram_base64 = base64.b64encode(buffer.getvalue()).decode()
word_count = {}
for word, color in repeated_words.items():
category = color # Asumiendo que 'color' es la categoría gramatical
word_count[category] = word_count.get(category, 0) + 1
analysis_document = {
'username': username,
'timestamp': datetime.utcnow(),
'text': text,
'word_count': word_count,
'arc_diagrams': arc_diagrams,
'network_diagram': network_diagram_base64
}
result = analysis_collection.insert_one(analysis_document)
logger.info(f"Análisis guardado con ID: {result.inserted_id} para el usuario: {username}")
return True
except Exception as e:
logger.error(f"Error al guardar el análisis para el usuario {username}: {str(e)}")
return False