File size: 1,988 Bytes
5e20c77
99a3f34
452939a
 
 
c0c01c6
452939a
c0c01c6
5b3feea
99a3f34
 
 
1cb46fc
5b3feea
 
5e20c77
 
c0c01c6
10330bc
 
 
 
1cb46fc
10330bc
 
 
 
 
5b3feea
10330bc
 
 
5e20c77
c0c01c6
10330bc
 
c0c01c6
 
99a3f34
c0c01c6
99a3f34
c0c01c6
5b3feea
c0c01c6
99a3f34
 
c0c01c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from chainlit.types import AskFileResponse
import click
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings.openai import OpenAIEmbeddings
import chainlit as cl
from src.config import Config

from src.config import Config
import logging

# text_splitter = RecursiveCharacterTextSplitter()
# embeddings = OpenAIEmbeddings()

def process_file(file: AskFileResponse):
    import tempfile

    if file.type == "text/plain":
        Loader = TextLoader
    elif file.type == "application/pdf":
        Loader = PyPDFLoader

    with tempfile.NamedTemporaryFile() as tempfile:
        tempfile.write(file.content)
        loader = Loader(tempfile.name)
        documents = loader.load()
        docs = Config.text_splitter.split_documents(documents)
        for i, doc in enumerate(docs):
            doc.metadata["source"] = f"source_{i}"
        return docs

def get_docsearch(file: AskFileResponse):
    docs = process_file(file)

    # Save data in the user session
    cl.user_session.set("docs", docs)

    # Create a unique namespace for the file

    docsearch = Chroma.from_documents(
        docs, Config.embeddings
    )
    return docsearch

def get_source(answer,source_documents):
        text_elements = []
        if source_documents:
            for source_idx, source_doc in enumerate(source_documents):
                source_name = f"source_{source_idx}"

                text_elements.append(
                    cl.Text(content=source_doc.page_content, name=source_name)
                )
            source_names = [text_el.name for text_el in text_elements]

            if source_names:
                answer += f"\nSources: {', '.join(source_names)}"
            else:
                answer += "\nNo source found"
        return text_elements