|
from typing import Dict |
|
import random |
|
import datetime |
|
import string |
|
|
|
from src.domain.doc import Doc |
|
from src.domain.wikidoc import WikiPage |
|
from src.view.log_msg import create_msg_from |
|
import src.tools.semantic_db as semantic_db |
|
from src.tools.wiki import Wiki |
|
from src.tools.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph |
|
from src.tools.semantic_db import add_texts_to_collection, query_collection |
|
|
|
|
|
class Controller: |
|
|
|
def __init__(self, config: Dict): |
|
self.templates_path = config['templates_path'] |
|
self.generated_docs_path = config['generated_docs_path'] |
|
self.styled_docs_path = config['styled_docs_path'] |
|
self.new_docs = [] |
|
self.gen_docs = [] |
|
|
|
template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']] |
|
self.default_template = Doc(template_path) |
|
self.template = self.default_template |
|
self.log = [] |
|
self.differences = [] |
|
|
|
def copy_docs(self, temp_docs: []): |
|
get_name = lambda doc: doc.name.split('/')[-1].split('.')[0] |
|
doc_names = [get_name(doc) for doc in temp_docs] |
|
docs = [Doc(path=doc.name) for doc in temp_docs] |
|
style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names] |
|
gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names] |
|
for doc, style_path, gen_path in zip(docs, style_paths, gen_paths): |
|
new_doc = doc.copy(style_path) |
|
self.new_docs.append(new_doc) |
|
|
|
def clear_docs(self): |
|
for new_doc in self.new_docs: |
|
new_doc.clear() |
|
for gen_doc in self.gen_docs: |
|
gen_doc.clear() |
|
self.new_docs = [] |
|
self.gen_docs = [] |
|
self.log = [] |
|
|
|
def set_template(self, template_name: str = ""): |
|
if not template_name: |
|
self.template = self.default_template |
|
else: |
|
template_path = f"{self.templates_path}/{template_name}" |
|
self.template = Doc(template_path) |
|
|
|
def get_difference_with_template(self): |
|
self.differences = [] |
|
for new_doc in self.new_docs: |
|
diff_styles = new_doc.get_different_styles_with_template(template=self.template) |
|
diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles] |
|
self.differences += diff_dicts |
|
template_styles = [name for name in self.template.styles.names if name.startswith('.')] |
|
return self.differences, template_styles |
|
|
|
def map_style(self, this_style_index: int, template_style_name: str): |
|
""" |
|
maps a style from 'this' document into a style from the template |
|
""" |
|
diff_dict = self.differences[this_style_index] |
|
doc = diff_dict['doc'] |
|
this_style_name = diff_dict['style'] |
|
log = doc.copy_one_style(this_style_name, template_style_name, self.template) |
|
self.log.append({doc.name: log}) |
|
|
|
def apply_template(self, add_front_pages: bool): |
|
for new_doc in self.new_docs: |
|
log = new_doc.apply_template(template=self.template, add_front_pages=add_front_pages) |
|
if log: |
|
self.log.append({new_doc.name: log}) |
|
|
|
def reset(self): |
|
for new_doc in self.new_docs: |
|
new_doc.delete() |
|
for gen_doc in self.gen_docs: |
|
gen_doc.delete() |
|
self.new_docs = [] |
|
self.gen_docs = [] |
|
|
|
|
|
def get_log(self): |
|
msg_log = create_msg_from(self.log, self.new_docs) |
|
return msg_log |
|
|
|
""" |
|
Source Control |
|
""" |
|
|
|
def get_or_create_collection(self, id_: str) -> str: |
|
""" |
|
generates a new id if needed |
|
""" |
|
if id_ != '-1': |
|
return id_ |
|
else: |
|
now = datetime.datetime.now().strftime("%m%d%H%M") |
|
letters = string.ascii_lowercase + string.digits |
|
id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10)) |
|
semantic_db.get_or_create_collection(id_) |
|
return id_ |
|
|
|
def wiki_fetch(self) -> [str]: |
|
""" |
|
returns the title of the wikipages corresponding to the tasks described in the input text |
|
""" |
|
all_tasks = [] |
|
for new_doc in self.new_docs: |
|
all_tasks += new_doc.tasks |
|
wiki_lists = [get_wikilist(t) for t in all_tasks] |
|
flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists])) |
|
return flatten_wiki_list |
|
|
|
async def wiki_upload_and_store(self, wiki_title: str, collection_name: str): |
|
""" |
|
uploads one wikipage and stores them into the right collection |
|
""" |
|
wikipage = Wiki().fetch(wiki_title) |
|
wiki_title = wiki_title |
|
if type(wikipage) != str: |
|
texts = WikiPage(wikipage.page_content).get_paragraphs() |
|
add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki') |
|
else: |
|
print(wikipage) |
|
|
|
""" |
|
Generate Control |
|
""" |
|
|
|
def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]: |
|
|
|
def query_from_task(task): |
|
return get_public_paragraph(task) |
|
|
|
gen_paths = [] |
|
|
|
for new_doc in self.new_docs: |
|
queries = [query_from_task(t) for t in new_doc.tasks] |
|
texts_list = [query_collection(coll_name=collection_name, query=q, from_files=from_files) for q in queries] |
|
task_resolutions = [get_private_paragraph(task=task, texts=texts) |
|
for task, texts in zip(new_doc.tasks, texts_list)] |
|
|
|
gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx" |
|
gen_doc = new_doc.copy(gen_path) |
|
|
|
gen_doc.replace_tasks(task_resolutions) |
|
gen_doc.save_as_docx() |
|
gen_paths.append(gen_doc.path) |
|
self.gen_docs.append(gen_doc) |
|
return gen_paths |
|
|