Spaces:
Running
Running
from pathlib import Path | |
from pprint import pprint | |
import os | |
import shutil | |
import sys | |
root_path = Path(__file__).parent.parent | |
sys.path.append(str(root_path)) | |
from server.knowledge_base.kb_service.base import KBServiceFactory | |
from server.knowledge_base.utils import get_kb_path, get_doc_path, KnowledgeFile | |
from server.knowledge_base.migrate import folder2db, prune_db_docs, prune_folder_files | |
# setup test knowledge base | |
kb_name = "test_kb_for_migrate" | |
test_files = { | |
"readme.md": str(root_path / "readme.md"), | |
} | |
kb_path = get_kb_path(kb_name) | |
doc_path = get_doc_path(kb_name) | |
if not os.path.isdir(doc_path): | |
os.makedirs(doc_path) | |
for k, v in test_files.items(): | |
shutil.copy(v, os.path.join(doc_path, k)) | |
def test_recreate_vs(): | |
folder2db([kb_name], "recreate_vs") | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
assert kb and kb.exists() | |
files = kb.list_files() | |
print(files) | |
for name in test_files: | |
assert name in files | |
path = os.path.join(doc_path, name) | |
# list docs based on file name | |
docs = kb.list_docs(file_name=name) | |
assert len(docs) > 0 | |
pprint(docs[0]) | |
for doc in docs: | |
assert doc.metadata["source"] == name | |
# list docs base on metadata | |
docs = kb.list_docs(metadata={"source": name}) | |
assert len(docs) > 0 | |
for doc in docs: | |
assert doc.metadata["source"] == name | |
def test_increment(): | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
kb.clear_vs() | |
assert kb.list_files() == [] | |
assert kb.list_docs() == [] | |
folder2db([kb_name], "increment") | |
files = kb.list_files() | |
print(files) | |
for f in test_files: | |
assert f in files | |
docs = kb.list_docs(file_name=f) | |
assert len(docs) > 0 | |
pprint(docs[0]) | |
for doc in docs: | |
assert doc.metadata["source"] == f | |
def test_prune_db(): | |
del_file, keep_file = list(test_files)[:2] | |
os.remove(os.path.join(doc_path, del_file)) | |
prune_db_docs([kb_name]) | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
files = kb.list_files() | |
print(files) | |
assert del_file not in files | |
assert keep_file in files | |
docs = kb.list_docs(file_name=del_file) | |
assert len(docs) == 0 | |
docs = kb.list_docs(file_name=keep_file) | |
assert len(docs) > 0 | |
pprint(docs[0]) | |
shutil.copy(test_files[del_file], os.path.join(doc_path, del_file)) | |
def test_prune_folder(): | |
del_file, keep_file = list(test_files)[:2] | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
# delete docs for file | |
kb.delete_doc(KnowledgeFile(del_file, kb_name)) | |
files = kb.list_files() | |
print(files) | |
assert del_file not in files | |
assert keep_file in files | |
docs = kb.list_docs(file_name=del_file) | |
assert len(docs) == 0 | |
docs = kb.list_docs(file_name=keep_file) | |
assert len(docs) > 0 | |
docs = kb.list_docs(file_name=del_file) | |
assert len(docs) == 0 | |
assert os.path.isfile(os.path.join(doc_path, del_file)) | |
# prune folder | |
prune_folder_files([kb_name]) | |
# check result | |
assert not os.path.isfile(os.path.join(doc_path, del_file)) | |
assert os.path.isfile(os.path.join(doc_path, keep_file)) | |
def test_drop_kb(): | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
kb.drop_kb() | |
assert not kb.exists() | |
assert not os.path.isdir(kb_path) | |
kb = KBServiceFactory.get_service_by_name(kb_name) | |
assert kb is None | |