import streamlit as st import logging import sys import os import re from langchain.chat_models import ChatOpenAI from langchain.llms import OpenAI from crawlbase import CrawlingAPI from langchain.output_parsers import StructuredOutputParser from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Qdrant from langchain.prompts import ChatPromptTemplate from elevenlabs import generate, play, set_api_key from langchain.schema import ( AIMessage, HumanMessage, SystemMessage ) import random from urllib.parse import urlparse, urlunparse set_api_key(st.secrets["ELEVENLABS_API_KEY"]) crawling_api_key = st.secrets["CRAWLING_API_KEY"] open_api_key = st.secrets["OPENAI_API_KEY"] logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) PAGE_TITLE: str = "Doodle" PAGE_ICON: str = "🗨️" st.set_page_config(page_title=PAGE_TITLE, page_icon=PAGE_ICON) def get_llm(model_name, model_temperature, api_key, max_tokens=None): if model_name == "text-davinci-003": return OpenAI(temperature=model_temperature, model_name=model_name, max_tokens=max_tokens, openai_api_key=api_key) else: return ChatOpenAI(temperature=model_temperature, model_name=model_name, max_tokens=max_tokens, openai_api_key=api_key) def is_valid_web_link(url): parsed_url = urlparse(url) cleaned_url = parsed_url._replace(query='')._replace(params='') if parsed_url.scheme and parsed_url.netloc: return urlunparse(cleaned_url) else: return None @st.cache_data def scrape_the_article(url): api = CrawlingAPI({'token': crawling_api_key}) response = api.get(url, options={'format': 'json', 'autoparse': 'true', 'scroll': 'true'}) # dict_keys(['alert', 'title', 'favicon', 'meta', 'content', 'canonical', 'images', 'grouped_images', 'og_images', 'links']) content = response['json'] return content def init_session() -> None: if 'init' not in st.session_state: st.session_state.init = True st.session_state.question = None st.session_state.messages = [] @st.cache_data def get_content_summary(content, model_name, api_key): llm = get_llm(model_name=model_name, model_temperature=0, api_key=api_key) format_instructions = \ """ The output should be a markdown code snippet formatted in the following schema, including the leading and trailing \\"```json\\" and \\"```\\": ```json{ "summary": string // overall text summary "blocks": [ { "block_summary": string // The summary of the first block "block_question": string // What is the question to clarify? }, ... ]} ``` """ prompt_template = """You are an advanced copywriter who can discuss and summarise articles. Translate the text to English if required. You instructions: 1) Write a concise summary of the whole text; 2) Break down the text into logical blocks containing unique information, extract important information for each block and write a summary using this information; 3) Generate relevant critical questions related to each block; 4) Format the output according to format instructions. Here is the text: ``` {text} ``` Format instructions: ``` {format_instructions} ``` Answer:""" prompt = ChatPromptTemplate.from_template(template=prompt_template) messages = prompt.format_messages(text=content, format_instructions=format_instructions) logging.info(messages) response = llm(messages) logging.info(response) output_parser = StructuredOutputParser.from_response_schemas([]) output_dict = output_parser.parse(response.content) return output_dict @st.cache_data def generate_audio(text): audio = generate( text=text, voice="Matthew" if random.randint(1, 10) % 2 == 0 else 'Dorothy', model="eleven_monolingual_v1" ) return audio @st.cache_resource def get_retriever(content): text_splitter = RecursiveCharacterTextSplitter( chunk_size=300, # it depends on the retriever parameters and the model's context length chunk_overlap=20, length_function=len, is_separator_regex=False, ) docs = text_splitter.create_documents([content]) embeddings = OpenAIEmbeddings(model="text-embedding-ada-002") qdrant = Qdrant.from_documents( docs, embeddings, location=":memory:", collection_name="qa" ) return qdrant @st.cache_data def qa(query, documents_to_search, model_name, api_key): retriever = get_retriever(st.session_state.content) found_docs = retriever.similarity_search(query, k=documents_to_search) llm = get_llm(model_name=model_name, model_temperature=0, api_key=api_key) template = \ """ You're an experienced copywriter. Answer the question in English. Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Answer the question in a way so that the reader has no more questions. Be concise. Make sure you mention all the important information. You can add additional relevant information from yourself that you think may contribute to the overall understanding. Asses critically the provided context, chat history or own answer. Chat History: ``` {chat_history} ``` Context: ``` {context} ``` Question: ``` {question} ``` Helpful Answer: """ prompt = ChatPromptTemplate.from_template(template=template) chat_history = [AIMessage(content=' '.join(st.session_state.content_block_summary))] messages = prompt.format_messages(context=found_docs, question=query, chat_history=chat_history) response = llm(messages) return response.content def show_audio_message(message): st.write(message) content_summary_audio = generate_audio(message) st.audio(content_summary_audio) def show_question_input(): def submit_question(): if len(st.session_state.user_question_widget) != 0: st.session_state.question = st.session_state.user_question_widget st.session_state.user_question_widget = '' else: logging.info("empty user question") st.text_area(label="Ask your question about the content of the page:", key='user_question_widget', on_change=submit_question) st.button("Submit") def on_question_button(question): st.session_state.question = question with st.expander("Example questions:"): for q in st.session_state.content_block_questions: st.button(q, on_click=on_question_button, args=[q]) def get_query_params(): if 'web_url' not in st.session_state: params = st.experimental_get_query_params() logging.debug(f"query parameters: {params}") if 'web_url' in params: web_url = params['web_url'][0] if len(web_url) > 0: if web_url := is_valid_web_link(web_url): st.session_state.web_url = web_url def show_header(): if 'web_url' in st.session_state: col1, col2 = st.columns(2) col1.caption(f"discussing: {st.session_state.web_url}") if 'title' in st.session_state: col2.caption(f"{st.session_state.title}") def get_random_page(): return 'https://mailchi.mp/expresso/lightpeak' def main() -> None: try: get_query_params() init_session() show_header() if 'web_url' not in st.session_state: st.header("Doodle") st.image("./assets/doodle-img.jpg") description = """\ Meet 'Doodle,' your shortcut to understanding the web! Got a lengthy article you're eyeing? Just paste the link, and in an instant, Doodle delivers a crisp summary and intriguing questions for you to chew on. Want to go hands-free? Doodle's text-to-speech feature will read it to you! Why the name 'Doodle'? Just as a simple doodle can encapsulate a whole idea, we distill webpages down to their essence! """ st.caption(description) st.divider() web_url = st.text_input(label='Paste your link, e.g. https://expresso.today', label_visibility='collapsed', placeholder='Paste your link, e.g. https://expresso.today') col1, _, _, _, col2 = st.columns(5) col1.button("Doodle") if col2.button("Random Page"): web_url = get_random_page() if len(web_url) > 0: if web_url := is_valid_web_link(web_url): st.session_state.web_url = web_url st.experimental_rerun() else: st.warning( "Whoops! That link seems to be doing the vanishing act. Could you give it another shot? Magic words: 'Valid Link, Please!' 🪄") elif 'content' not in st.session_state: with st.spinner(f"reading the web page '{st.session_state.web_url}' ..."): st.session_state.web_page = scrape_the_article(st.session_state.web_url) st.session_state.title = st.session_state.web_page['title'] st.session_state.content = st.session_state.web_page['content'] st.experimental_rerun() elif 'content_summary' not in st.session_state: content_summary = get_content_summary(content=st.session_state.content, model_name="gpt-3.5-turbo-16k", api_key=open_api_key) st.session_state.content_summary = content_summary['summary'] st.session_state.content_block_summary = [s['block_summary'] for s in content_summary['blocks']] st.session_state.content_block_questions = [s['block_question'] for s in content_summary['blocks']] show_audio_message(st.session_state.content_summary) show_question_input() elif 'question' in st.session_state and st.session_state.question is not None: question = st.session_state.question st.subheader(question) st.divider() with st.spinner(f'answering the question...'): answer = qa(query=question, documents_to_search=20, model_name='gpt-4', api_key=open_api_key) show_audio_message(answer) st.session_state.question = None show_question_input() else: show_question_input() except Exception as e: logging.error(e) st.warning("""\ Whoops, looks like a hiccup in the system! But no worries, our tech wizards are already on the case, working their magic. In the meantime, how about giving it another shot? """) if st.button("Give It Another Go!"): st.experimental_rerun() if __name__ == "__main__": main() # TODO: # - connect to langsmith # - chat history # - store history externaly along with audio description and return from cache