Spaces:
Runtime error
Runtime error
import json | |
import logging | |
import re | |
from typing import Any, TypedDict | |
import httpx | |
from .utils import env_str | |
class WorkflowData(TypedDict): | |
status: str | |
writers: list[str] | |
editors: list[str] | |
proofers: list[str] | |
reviewers: list[str] | |
proofingDeadline: str | |
class Document(TypedDict): | |
_id: str | |
_rev: str | |
type: str | |
mimetype: str | |
title: str | |
language: str | |
workflowData: WorkflowData | |
path: str | |
name: str | |
created: int | |
creator: str | |
lastPublished: int | |
firstPublished: int | |
modified: int | |
modifier: str | |
published: int | |
authors: list[str] | |
content: str | |
contentAssets: list[str] | |
featuredImages: list[str] | |
keywords: list[str] | |
topics: list[str] | |
relatedAssets: list[str] | |
comments: bool | |
campaignConfigs: list[Any] | |
order: int | |
overline: str | |
translatedFrom: str | |
socialTitles: list[Any] | |
socialDescriptions: list[Any] | |
socialFeaturedImages: list[Any] | |
underline: str | |
template: str | |
description: str | |
suggestedImages: list[str] | |
publisher: str | |
class DocumentManager: | |
def __init__(self) -> None: | |
self.client = self.make_client() | |
self.path_view = env_str("DOCS_PATH_VIEW") | |
def make_client(self) -> httpx.AsyncClient: | |
base_url = env_str("DOCS_URL") | |
auth = env_str("DOCS_AUTH") | |
headers = {"Authorization": f"Basic {auth}"} | |
client = httpx.AsyncClient(base_url=base_url, headers=headers) | |
return client | |
async def get_doc_by_id(self, doc_id: str) -> Document | None: | |
try: | |
response = await self.client.get(doc_id) | |
if response.status_code == 404: | |
return None | |
response.raise_for_status() | |
return response.json() | |
except Exception as e: | |
logging.error("Error fetching document by ID", exc_info=e) | |
return None | |
async def get_doc_by_path(self, path: str) -> Document | None: | |
try: | |
params = { | |
"limit": "1", | |
"key": json.dumps(path), | |
"include_docs": "true", | |
} | |
response = await self.client.get(self.path_view, params=params) | |
response.raise_for_status() | |
data = response.json() | |
rows = data["rows"] | |
if not rows: | |
return None | |
return rows[0]["doc"] | |
except Exception as e: | |
logging.error("Error fetching document by path", exc_info=e) | |
return None | |
async def get_doc(self, id_or_path: str) -> Document | None: | |
uuids = extract_doc_ids(id_or_path) | |
for uuid in uuids: | |
doc = await self.get_doc_by_id(uuid) | |
if doc: | |
return doc | |
path = extract_doc_path(id_or_path) | |
if path: | |
return await self.get_doc_by_path(path) | |
return None | |
UUID_PATTERN = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}") | |
def extract_doc_ids(s: str) -> list[str]: | |
return UUID_PATTERN.findall(s) | |
def extract_doc_path(s: str) -> str | None: | |
if not s.endswith(".html"): | |
return None | |
if s.startswith("/"): | |
return s | |
if "://" in s: | |
s = s.split("://", 1)[1] | |
if "/" in s: | |
return "/" + s.split("/", 1)[1] | |
return None | |
document_manager = DocumentManager() | |
if __name__ == "__main__": | |
async def main() -> None: | |
db = DocumentManager() | |
# result = await db.get_doc_by_id("b7fdc644-5b24-40ae-b489-37b3fc0c5541") | |
# result = await db.get_doc_by_path("/en/articles/2024/11/28/slci-n28.html") | |
# result = await db.get_doc("https://www.cnn.com/en/articles/2024/11/28/slci-n28.html") | |
result = await db.get_doc("https://bbc.com/news/the-2024-us-elections-efb37bf1-16bb-4bbb-88ce-4273cf657c11") | |
print(json.dumps(result, indent=2)) | |
import asyncio | |
from dotenv import load_dotenv | |
load_dotenv() | |
asyncio.run(main()) | |