Spaces:
Running
Running
File size: 5,935 Bytes
03c0888 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# Make sure to install the required packageschainlit and groq
import os, time
from openai import AsyncOpenAI
import chainlit as cl
import re
import requests
from io import BytesIO
from chainlit.element import ElementBased
from groq import Groq
# Import threadpools to run the crawl_url function in a separate thread
from concurrent.futures import ThreadPoolExecutor
client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv("GROQ_API_KEY"))
# Instrument the OpenAI client
cl.instrument_openai()
settings = {
"model": "llama3-8b-8192",
"temperature": 0.5,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
}
def extract_urls(text):
url_pattern = re.compile(r'(https?://\S+)')
return url_pattern.findall(text)
def crawl_url(url):
data = {
"urls": [url],
"include_raw_html": True,
"word_count_threshold": 10,
"extraction_strategy": "NoExtractionStrategy",
"chunking_strategy": "RegexChunking"
}
response = requests.post("https://crawl4ai.com/crawl", json=data)
response_data = response.json()
response_data = response_data['results'][0]
return response_data['markdown']
@cl.on_chat_start
async def on_chat_start():
cl.user_session.set("session", {
"history": [],
"context": {}
})
await cl.Message(
content="Welcome to the chat! How can I assist you today?"
).send()
@cl.on_message
async def on_message(message: cl.Message):
user_session = cl.user_session.get("session")
# Extract URLs from the user's message
urls = extract_urls(message.content)
futures = []
with ThreadPoolExecutor() as executor:
for url in urls:
futures.append(executor.submit(crawl_url, url))
results = [future.result() for future in futures]
for url, result in zip(urls, results):
ref_number = f"REF_{len(user_session['context']) + 1}"
user_session["context"][ref_number] = {
"url": url,
"content": result
}
user_session["history"].append({
"role": "user",
"content": message.content
})
# Create a system message that includes the context
context_messages = [
f'<appendix ref="{ref}">\n{data["content"]}\n</appendix>'
for ref, data in user_session["context"].items()
]
if context_messages:
system_message = {
"role": "system",
"content": (
"You are a helpful bot. Use the following context for answering questions. "
"Refer to the sources using the REF number in square brackets, e.g., [1], only if the source is given in the appendices below.\n\n"
"If the question requires any information from the provided appendices or context, refer to the sources. "
"If not, there is no need to add a references section. "
"At the end of your response, provide a reference section listing the URLs and their REF numbers only if sources from the appendices were used.\n\n"
"\n\n".join(context_messages)
)
}
else:
system_message = {
"role": "system",
"content": "You are a helpful assistant."
}
msg = cl.Message(content="")
await msg.send()
# Get response from the LLM
stream = await client.chat.completions.create(
messages=[
system_message,
*user_session["history"]
],
stream=True,
**settings
)
assistant_response = ""
async for part in stream:
if token := part.choices[0].delta.content:
assistant_response += token
await msg.stream_token(token)
# Add assistant message to the history
user_session["history"].append({
"role": "assistant",
"content": assistant_response
})
await msg.update()
# Append the reference section to the assistant's response
reference_section = "\n\nReferences:\n"
for ref, data in user_session["context"].items():
reference_section += f"[{ref.split('_')[1]}]: {data['url']}\n"
msg.content += reference_section
await msg.update()
@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.AudioChunk):
if chunk.isStart:
buffer = BytesIO()
# This is required for whisper to recognize the file type
buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}"
# Initialize the session for a new audio stream
cl.user_session.set("audio_buffer", buffer)
cl.user_session.set("audio_mime_type", chunk.mimeType)
# Write the chunks to a buffer and transcribe the whole audio at the end
cl.user_session.get("audio_buffer").write(chunk.data)
pass
@cl.step(type="tool")
async def speech_to_text(audio_file):
cli = Groq()
response = await client.audio.transcriptions.create(
model="whisper-large-v3", file=audio_file
)
return response.text
@cl.on_audio_end
async def on_audio_end(elements: list[ElementBased]):
# Get the audio buffer from the session
audio_buffer: BytesIO = cl.user_session.get("audio_buffer")
audio_buffer.seek(0) # Move the file pointer to the beginning
audio_file = audio_buffer.read()
audio_mime_type: str = cl.user_session.get("audio_mime_type")
start_time = time.time()
whisper_input = (audio_buffer.name, audio_file, audio_mime_type)
transcription = await speech_to_text(whisper_input)
end_time = time.time()
print(f"Transcription took {end_time - start_time} seconds")
user_msg = cl.Message(
author="You",
type="user_message",
content=transcription
)
await user_msg.send()
await on_message(user_msg)
if __name__ == "__main__":
from chainlit.cli import run_chainlit
run_chainlit(__file__)
|