Spaces:
Runtime error
Runtime error
import logging | |
import uuid | |
from fastapi import FastAPI | |
from gistillery.base import EntriesResult, JobStatus, JobStatusResult, RequestInput | |
from gistillery.db import TABLES, get_db_cursor | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
app = FastAPI() | |
# status | |
def status() -> str: | |
return "OK" | |
def submit_job(input: RequestInput) -> str: | |
# submit a new job, poor man's job queue | |
_id = uuid.uuid4().hex | |
logger.info(f"Submitting job for (_id={_id[:8]})") | |
with get_db_cursor() as cursor: | |
# create a job | |
query = "INSERT INTO jobs (entry_id, status) VALUES (?, ?)" | |
cursor.execute(query, (_id, "pending")) | |
# create an entry | |
query = "INSERT INTO entries (id, author, source) VALUES (?, ?, ?)" | |
cursor.execute(query, (_id, input.author, input.content)) | |
return f"Submitted job {_id}" | |
def check_job_status(_id: str) -> JobStatusResult: | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"SELECT status, last_updated FROM jobs WHERE entry_id = ?", (_id,) | |
) | |
result = cursor.fetchone() | |
if result is None: | |
return JobStatusResult(id=_id, status=JobStatus.not_found, last_updated=None) | |
status, last_updated = result | |
return JobStatusResult(id=_id, status=status, last_updated=last_updated) | |
def recent() -> list[EntriesResult]: | |
with get_db_cursor() as cursor: | |
# get the last 10 entries, join summary and tag, where each tag is | |
# joined to a comma separated str | |
cursor.execute(""" | |
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ",") tags, e.created_at | |
FROM entries e | |
JOIN summaries s ON e.id = s.entry_id | |
JOIN tags t ON e.id = t.entry_id | |
GROUP BY e.id | |
ORDER BY e.created_at DESC | |
LIMIT 10 | |
""") | |
results = cursor.fetchall() | |
entries = [] | |
for _id, author, summary, tags, date in results: | |
entry = EntriesResult( | |
id=_id, author=author, summary=summary, tags=tags.split(","), date=date | |
) | |
entries.append(entry) | |
return entries | |
def recent_tag(tag: str) -> list[EntriesResult]: | |
if not tag.startswith("#"): | |
tag = "#" + tag | |
# same as recent, but filter by tag | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
""" | |
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ",") tags, e.created_at | |
FROM entries e | |
JOIN summaries s ON e.id = s.entry_id | |
JOIN tags t ON e.id = t.entry_id | |
WHERE e.id IN ( | |
SELECT entry_id FROM tags WHERE tag = ? | |
) | |
GROUP BY e.id | |
ORDER BY e.created_at DESC | |
LIMIT 10 | |
""", | |
(tag,), | |
) | |
results = cursor.fetchall() | |
entries = [] | |
for _id, author, summary, tags, date in results: | |
entry = EntriesResult( | |
id=_id, author=author, summary=summary, tags=tags.split(","), date=date | |
) | |
entries.append(entry) | |
return entries | |
def clear() -> str: | |
# clear all tables | |
logger.warning("Clearing all tables") | |
with get_db_cursor() as cursor: | |
for table_name in TABLES: | |
cursor.execute(f"DELETE FROM {table_name}") | |
return "OK" | |