|
import streamlit as st |
|
import logging |
|
import sys |
|
import os |
|
import re |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.llms import OpenAI |
|
from crawlbase import CrawlingAPI |
|
from langchain.output_parsers import StructuredOutputParser |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
from langchain.vectorstores import Qdrant |
|
from langchain.prompts import ChatPromptTemplate |
|
from elevenlabs import generate, play, set_api_key |
|
from langchain.schema import ( |
|
AIMessage, |
|
HumanMessage, |
|
SystemMessage |
|
) |
|
import random |
|
from urllib.parse import urlparse, urlunparse |
|
|
|
set_api_key(st.secrets["ELEVENLABS_API_KEY"]) |
|
crawling_api_key = st.secrets["CRAWLING_API_KEY"] |
|
open_api_key = st.secrets["OPENAI_API_KEY"] |
|
|
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) |
|
|
|
PAGE_TITLE: str = "Doodle" |
|
PAGE_ICON: str = "🗨️" |
|
|
|
st.set_page_config(page_title=PAGE_TITLE, page_icon=PAGE_ICON) |
|
|
|
|
|
def get_llm(model_name, model_temperature, api_key, max_tokens=None): |
|
if model_name == "text-davinci-003": |
|
return OpenAI(temperature=model_temperature, model_name=model_name, max_tokens=max_tokens, |
|
openai_api_key=api_key) |
|
else: |
|
return ChatOpenAI(temperature=model_temperature, model_name=model_name, max_tokens=max_tokens, |
|
openai_api_key=api_key) |
|
|
|
|
|
def is_valid_web_link(url): |
|
parsed_url = urlparse(url) |
|
cleaned_url = parsed_url._replace(query='')._replace(params='') |
|
if parsed_url.scheme and parsed_url.netloc: |
|
return urlunparse(cleaned_url) |
|
else: |
|
return None |
|
|
|
|
|
@st.cache_data |
|
def scrape_the_article(url): |
|
api = CrawlingAPI({'token': crawling_api_key}) |
|
response = api.get(url, options={'format': 'json', 'autoparse': 'true', 'scroll': 'true'}) |
|
|
|
content = response['json'] |
|
return content |
|
|
|
|
|
def init_session() -> None: |
|
if 'init' not in st.session_state: |
|
st.session_state.init = True |
|
st.session_state.question = None |
|
|
|
st.session_state.messages = [] |
|
|
|
|
|
@st.cache_data |
|
def get_content_summary(content, model_name, api_key): |
|
llm = get_llm(model_name=model_name, model_temperature=0, api_key=api_key) |
|
format_instructions = \ |
|
""" |
|
The output should be a markdown code snippet formatted in the following schema, including the leading and trailing \\"```json\\" and \\"```\\": |
|
```json{ |
|
"summary": string // overall text summary |
|
"blocks": [ |
|
{ |
|
"block_summary": string // The summary of the first block |
|
"block_question": string // What is the question to clarify? |
|
}, ... |
|
]} |
|
``` |
|
""" |
|
prompt_template = """You are an advanced copywriter who can discuss and summarise articles. Translate the text to English if required. You instructions: 1) Write a concise summary of the whole text; 2) Break down the text into logical blocks containing unique information, extract important information for each block and write a summary using this information; 3) Generate relevant critical questions related to each block; 4) Format the output according to format instructions. Here is the text: |
|
``` {text} ``` |
|
Format instructions: ``` {format_instructions} ``` |
|
Answer:""" |
|
prompt = ChatPromptTemplate.from_template(template=prompt_template) |
|
messages = prompt.format_messages(text=content, format_instructions=format_instructions) |
|
logging.info(messages) |
|
response = llm(messages) |
|
logging.info(response) |
|
|
|
output_parser = StructuredOutputParser.from_response_schemas([]) |
|
output_dict = output_parser.parse(response.content) |
|
return output_dict |
|
|
|
|
|
@st.cache_data |
|
def generate_audio(text): |
|
audio = generate( |
|
text=text, |
|
voice="Matthew" if random.randint(1, 10) % 2 == 0 else 'Dorothy', |
|
model="eleven_monolingual_v1" |
|
) |
|
return audio |
|
|
|
|
|
@st.cache_resource |
|
def get_retriever(content): |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=300, |
|
chunk_overlap=20, |
|
length_function=len, |
|
is_separator_regex=False, |
|
) |
|
docs = text_splitter.create_documents([content]) |
|
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002") |
|
qdrant = Qdrant.from_documents( |
|
docs, embeddings, |
|
location=":memory:", |
|
collection_name="qa" |
|
) |
|
return qdrant |
|
|
|
|
|
@st.cache_data |
|
def qa(query, documents_to_search, model_name, api_key): |
|
retriever = get_retriever(st.session_state.content) |
|
found_docs = retriever.similarity_search(query, k=documents_to_search) |
|
llm = get_llm(model_name=model_name, model_temperature=0, api_key=api_key) |
|
template = \ |
|
""" |
|
You're an experienced copywriter. Answer the question in English. Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. |
|
Answer the question in a way so that the reader has no more questions. Be concise. Make sure you mention all the important information. You can add additional relevant information from yourself that you think may contribute to the overall understanding. Asses critically the provided context, chat history or own answer. |
|
Chat History: ``` {chat_history} ``` |
|
Context: ``` {context} ``` |
|
Question: ``` {question} ``` |
|
Helpful Answer: |
|
""" |
|
prompt = ChatPromptTemplate.from_template(template=template) |
|
|
|
chat_history = [AIMessage(content=' '.join(st.session_state.content_block_summary))] |
|
messages = prompt.format_messages(context=found_docs, question=query, chat_history=chat_history) |
|
response = llm(messages) |
|
return response.content |
|
|
|
|
|
def show_audio_message(message): |
|
st.write(message) |
|
content_summary_audio = generate_audio(message) |
|
st.audio(content_summary_audio) |
|
|
|
|
|
def show_question_input(): |
|
def submit_question(): |
|
if len(st.session_state.user_question_widget) != 0: |
|
st.session_state.question = st.session_state.user_question_widget |
|
st.session_state.user_question_widget = '' |
|
else: |
|
logging.info("empty user question") |
|
|
|
st.text_area(label="Ask your question about the content of the page:", |
|
key='user_question_widget', |
|
on_change=submit_question) |
|
st.button("Submit") |
|
|
|
def on_question_button(question): |
|
st.session_state.question = question |
|
|
|
with st.expander("Example questions:"): |
|
for q in st.session_state.content_block_questions: |
|
st.button(q, on_click=on_question_button, args=[q]) |
|
|
|
|
|
def get_query_params(): |
|
if 'web_url' not in st.session_state: |
|
params = st.experimental_get_query_params() |
|
logging.debug(f"query parameters: {params}") |
|
if 'web_url' in params: |
|
web_url = params['web_url'][0] |
|
if len(web_url) > 0: |
|
if web_url := is_valid_web_link(web_url): |
|
st.session_state.web_url = web_url |
|
|
|
|
|
def show_header(): |
|
if 'web_url' in st.session_state: |
|
col1, col2 = st.columns(2) |
|
col1.caption(f"discussing: {st.session_state.web_url}") |
|
if 'title' in st.session_state: |
|
col2.caption(f"{st.session_state.title}") |
|
|
|
|
|
def get_random_page(): |
|
return 'https://mailchi.mp/expresso/lightpeak' |
|
|
|
|
|
def main() -> None: |
|
try: |
|
get_query_params() |
|
init_session() |
|
show_header() |
|
|
|
if 'web_url' not in st.session_state: |
|
st.header("Doodle") |
|
st.image("./assets/doodle-img.jpg") |
|
description = """\ |
|
Meet 'Doodle,' your shortcut to understanding the web! Got a lengthy article you're eyeing? |
|
Just paste the link, and in an instant, Doodle delivers a crisp summary and intriguing questions for you to |
|
chew on. Want to go hands-free? Doodle's text-to-speech feature will read it to you! Why the name 'Doodle'? |
|
Just as a simple doodle can encapsulate a whole idea, we distill webpages down to their essence! |
|
""" |
|
st.caption(description) |
|
st.divider() |
|
|
|
web_url = st.text_input(label='Paste your link, e.g. https://expresso.today', |
|
label_visibility='collapsed', |
|
placeholder='Paste your link, e.g. https://expresso.today') |
|
col1, _, _, _, col2 = st.columns(5) |
|
col1.button("Doodle") |
|
if col2.button("Random Page"): |
|
web_url = get_random_page() |
|
if len(web_url) > 0: |
|
if web_url := is_valid_web_link(web_url): |
|
st.session_state.web_url = web_url |
|
st.experimental_rerun() |
|
else: |
|
st.warning( |
|
"Whoops! That link seems to be doing the vanishing act. Could you give it another shot? Magic words: 'Valid Link, Please!' 🪄") |
|
|
|
elif 'content' not in st.session_state: |
|
with st.spinner(f"reading the web page '{st.session_state.web_url}' ..."): |
|
st.session_state.web_page = scrape_the_article(st.session_state.web_url) |
|
st.session_state.title = st.session_state.web_page['title'] |
|
st.session_state.content = st.session_state.web_page['content'] |
|
st.experimental_rerun() |
|
|
|
elif 'content_summary' not in st.session_state: |
|
content_summary = get_content_summary(content=st.session_state.content, model_name="gpt-3.5-turbo-16k", |
|
api_key=open_api_key) |
|
st.session_state.content_summary = content_summary['summary'] |
|
st.session_state.content_block_summary = [s['block_summary'] for s in content_summary['blocks']] |
|
st.session_state.content_block_questions = [s['block_question'] for s in content_summary['blocks']] |
|
|
|
show_audio_message(st.session_state.content_summary) |
|
show_question_input() |
|
elif 'question' in st.session_state and st.session_state.question is not None: |
|
question = st.session_state.question |
|
st.subheader(question) |
|
st.divider() |
|
with st.spinner(f'answering the question...'): |
|
answer = qa(query=question, documents_to_search=20, model_name='gpt-4', api_key=open_api_key) |
|
show_audio_message(answer) |
|
st.session_state.question = None |
|
show_question_input() |
|
else: |
|
show_question_input() |
|
except Exception as e: |
|
logging.error(e) |
|
st.warning("""\ |
|
Whoops, looks like a hiccup in the system! But no worries, our tech wizards are already on |
|
the case, working their magic. In the meantime, how about giving it another shot? |
|
""") |
|
if st.button("Give It Another Go!"): |
|
st.experimental_rerun() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|
|
|
|
|
|
|
|
|