import datetime as dt import os import sqlite3 from types import SimpleNamespace import pytest from fastapi.testclient import TestClient def is_roughly_now(datetime_str): """Check if a datetime string is roughly from now""" now = dt.datetime.now(dt.timezone.utc) datetime = dt.datetime.fromisoformat(datetime_str) # set timezone, otherwise cannot subtract datetime = datetime.replace(tzinfo=dt.timezone.utc) return (now - datetime).total_seconds() < 3 class TestWebservice: @pytest.fixture(autouse=True) def db_file(self, tmp_path): filename = tmp_path / "test-db.sqlite" os.environ["DB_FILE_NAME"] = str(filename) @pytest.fixture def cursor(self): from gistillery.db import get_db_cursor with get_db_cursor() as cursor: yield cursor @pytest.fixture def client(self): from gistillery.webservice import app client = TestClient(app) client.get("/clear") return client @pytest.fixture def registry(self): # use dummy models from gistillery.tools import Summarizer, Tagger from gistillery.preprocessing import RawTextProcessor from gistillery.registry import ToolRegistry class DummySummarizer(Summarizer): """Returns the first 10 characters of the input""" def get_name(self): return "dummy summarizer" def __call__(self, x): return x[:10] class DummyTagger(Tagger): """Returns the first 3 words of the input""" def get_name(self): return "dummy tagger" def __call__(self, x): return ["#" + word for word in x.split(maxsplit=4)[:3]] registry = ToolRegistry() registry.register_processor(RawTextProcessor()) # arguments don't matter for dummy summarizer and tagger summarizer = DummySummarizer() registry.register_summarizer(summarizer) tagger = DummyTagger() registry.register_tagger(tagger) return registry def process_jobs(self, registry): # emulate work of the background worker from gistillery.worker import check_pending_jobs, process_job jobs = check_pending_jobs() for job in jobs: process_job(job, registry) def test_status(self, client): resp = client.get("/status") assert resp.status_code == 200 assert resp.json() == "OK" def test_recent_empty(self, client): resp = client.get("/recent") assert resp.json() == [] def test_recent_tag_empty(self, client, monkeypatch): resp = client.get("/recent/general") assert resp.json() == [] # monkeypatch uuid4 to return a known value job_id = "abc1234" monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex=job_id)) client.post("/submit", json={"author": "ben", "content": "this is a test"}) resp = client.get(f"/check_job_status/{job_id}") output = resp.json() last_updated = output.pop("last_updated") assert output == { "id": job_id, "status": "pending", } assert is_roughly_now(last_updated) def test_submitted_job_status_not_found(self, client, monkeypatch): # monkeypatch uuid4 to return a known value job_id = "abc1234" monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex=job_id)) client.post("/submit", json={"author": "ben", "content": "this is a test"}) other_job_id = "def5678" resp = client.get(f"/check_job_status/{other_job_id}") output = resp.json() last_updated = output.pop("last_updated") assert output == { "id": other_job_id, "status": "not found", } assert last_updated is None def test_submitted_job_failed(self, client, registry, monkeypatch): # monkeypatch uuid4 to return a known value job_id = "abc1234" monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex=job_id)) client.post("/submit", json={"author": "ben", "content": "this is a test"}) # patch gistillery.worker._process_job to raise an exception def raise_(ex): raise ex # make the job processing fail monkeypatch.setattr( "gistillery.worker._process_job", lambda job, registry: raise_(RuntimeError("something went wrong")), ) self.process_jobs(registry) resp = client.get(f"/check_job_status/{job_id}") output = resp.json() output.pop("last_updated") assert output == { "id": job_id, "status": "failed", } def test_submitted_job_status_done(self, client, registry, monkeypatch): # monkeypatch uuid4 to return a known value job_id = "abc1234" monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex=job_id)) client.post("/submit", json={"author": "ben", "content": "this is a test"}) self.process_jobs(registry) resp = client.get(f"/check_job_status/{job_id}") output = resp.json() last_updated = output.pop("last_updated") assert output == { "id": job_id, "status": "done", } assert is_roughly_now(last_updated) def test_status_pending_jobs(self, client, registry, monkeypatch): resp = client.get("/check_job_status/") output = resp.json() assert output == "No pending jobs found" monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex="abc0")) client.post("/submit", json={"author": "ben", "content": "this is a test"}) resp = client.get("/check_job_status/") output = resp.json() expected = "Found 1 pending job(s): abc0" assert output == expected for i in range(1, 10): monkeypatch.setattr("uuid.uuid4", lambda: SimpleNamespace(hex=f"abc{i}")) client.post("/submit", json={"author": "ben", "content": "this is a test"}) resp = client.get("/check_job_status/") output = resp.json() expected = "Found 10 pending job(s): abc0, abc1, abc2, ..." assert output == expected def test_recent_with_entries(self, client, registry): # submit 2 entries client.post( "/submit", json={"author": "maxi", "content": "this is a first test"} ) client.post( "/submit", json={"author": "mini", "content": "this would be something else"}, ) self.process_jobs(registry) resp = client.get("/recent").json() # results are sorted by recency but since dummy models are so fast, the # date in the db could be the same, so we sort by author resp = sorted(resp, key=lambda x: x["author"]) assert len(resp) == 2 resp0 = resp[0] assert resp0["author"] == "maxi" assert resp0["summary"] == "this is a " assert resp0["tags"] == sorted(["#this", "#is", "#a"]) resp1 = resp[1] assert resp1["author"] == "mini" assert resp1["summary"] == "this would" assert resp1["tags"] == sorted(["#this", "#would", "#be"]) def test_recent_tag_with_entries(self, client, registry): # submit 2 entries client.post( "/submit", json={"author": "maxi", "content": "this is a first test"} ) client.post( "/submit", json={"author": "mini", "content": "this would be something else"}, ) self.process_jobs(registry) # the "this" tag is in both entries resp = client.get("/recent/this").json() assert len(resp) == 2 # the "would" tag is in only one entry resp = client.get("/recent/would").json() assert len(resp) == 1 resp0 = resp[0] assert resp0["author"] == "mini" assert resp0["summary"] == "this would" assert resp0["tags"] == sorted(["#this", "#would", "#be"]) def test_recent_multiple_entries(self, client, registry): # submit 2 entries client.post( "/submit", json={"author": "maxi", "content": "aardvark ant antelope"} ) client.post( "/submit", json={"author": "mini", "content": "bat bear bee"}, ) client.post( "/submit", json={"author": "mini", "content": "camel canary cat"}, ) self.process_jobs(registry) # the "ant" tag is in only one entry resp = client.get("/recent/ant").json() assert len(resp) == 1 # "ant" and "bee" are in two entries resp = client.get("/recent/ant,bee").json() assert len(resp) == 2 # "ant" and "bee" and "cat" are in three entries resp = client.get("/recent/cat,ant,bee").json() assert len(resp) == 3 def test_tag_count(self, client, registry): # submit 2 entries client.post( "/submit", json={"author": "ben", "content": "aardvark ant antelope"} ) client.post( "/submit", json={"author": "ben", "content": "aardvark ant bat"}, ) client.post( "/submit", json={"author": "ben", "content": "aardvark camel canary"}, ) self.process_jobs(registry) resp = client.get("/tag_counts").json() expected = { "#aardvark": 3, "#ant": 2, "#antelope": 1, "#bat": 1, "#camel": 1, "#canary": 1, } assert resp == expected def test_clear(self, client, cursor, registry): client.post("/submit", json={"author": "ben", "content": "this is a test"}) self.process_jobs(registry) assert cursor.execute("SELECT COUNT(*) c FROM entries").fetchone()[0] == 1 client.get("/clear") assert cursor.execute("SELECT COUNT(*) c FROM entries").fetchone()[0] == 0 def test_inputs_stored(self, client, cursor, registry): client.post("/submit", json={"author": "ben", "content": " this is a test\n"}) self.process_jobs(registry) rows = cursor.execute("SELECT * FROM inputs").fetchall() assert len(rows) == 1 assert rows[0].input == "this is a test" def test_submit_url(self, client, cursor, registry, monkeypatch): class MockClient: """Mock httpx Client, return www.example.com content""" def get(self, url): return SimpleNamespace( text=''' \n\n
\nThis domain is for use in illustrative examples in documents. You may use this\n domain in literature without prior coordination or asking for permission.
\n \n