|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
from typing import List, Tuple |
|
import fitz |
|
from sentence_transformers import SentenceTransformer |
|
import numpy as np |
|
import faiss |
|
from gtts import gTTS |
|
import os |
|
from PIL import Image |
|
from moviepy.editor import ImageSequenceClip |
|
|
|
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") |
|
|
|
class MyApp: |
|
def __init__(self) -> None: |
|
self.documents = [] |
|
self.embeddings = None |
|
self.index = None |
|
self.load_pdf("THEDIA1.pdf") |
|
self.build_vector_db() |
|
|
|
def load_pdf(self, file_path: str) -> None: |
|
doc = fitz.open(file_path) |
|
self.documents = [] |
|
for page_num in range(len(doc)): |
|
page = doc[page_num] |
|
text = page.get_text() |
|
self.documents.append({"page": page_num + 1, "content": text}) |
|
print("PDF processed successfully!") |
|
|
|
def build_vector_db(self) -> None: |
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
self.embeddings = model.encode([doc["content"] for doc in self.documents], show_progress_bar=True) |
|
self.index = faiss.IndexFlatL2(self.embeddings.shape[1]) |
|
self.index.add(np.array(self.embeddings)) |
|
print("Vector database built successfully!") |
|
|
|
def search_documents(self, query: str, k: int = 3) -> List[str]: |
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
query_embedding = model.encode([query], show_progress_bar=False) |
|
D, I = self.index.search(np.array(query_embedding), k) |
|
results = [self.documents[i]["content"] for i in I[0]] |
|
return results if results else ["No relevant documents found."] |
|
|
|
app = MyApp() |
|
|
|
def preprocess_response(response: str) -> str: |
|
response = response.strip() |
|
response = response.replace("\n\n", "\n") |
|
response = response.replace(" ,", ",") |
|
response = response.replace(" .", ".") |
|
response = " ".join(response.split()) |
|
if not any(word in response.lower() for word in ["sorry", "apologize", "empathy"]): |
|
response = "I'm here to help. " + response |
|
return response |
|
|
|
def shorten_response(response: str) -> str: |
|
messages = [{"role": "system", "content": "Shorten and refine this response in a supportive and empathetic manner."}, {"role": "user", "content": response}] |
|
result = client.chat_completion(messages, max_tokens=512, temperature=0.5, top_p=0.9) |
|
return result.choices[0].message['content'].strip() |
|
|
|
def text_to_speech(text: str, lang: str = 'en'): |
|
tts = gTTS(text=text, lang=lang, slow=False) |
|
tts.save("response.mp3") |
|
return "response.mp3" |
|
|
|
def create_speaking_avatar(image_path: str, audio_path: str): |
|
|
|
image = Image.open(image_path) |
|
frames = [image] * 30 |
|
clip = ImageSequenceClip([np.array(f) for f in frames], fps=30) |
|
clip = clip.set_audio(audio_path) |
|
output_path = "output.mp4" |
|
clip.write_videofile(output_path, codec="libx264") |
|
return output_path |
|
|
|
def respond(message: str, history: List[Tuple[str, str]]): |
|
system_message = "You are a supportive and empathetic Dialectical Behaviour Therapist assistant. You politely guide users through DBT exercises based on the given DBT book. You must say one thing at a time and ask follow-up questions to continue the chat." |
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
for val in history: |
|
if val[0]: |
|
messages.append({"role": "user", "content": val[0]}) |
|
if val[1]: |
|
messages.append({"role": "assistant", "content": val[1]}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
if any(keyword in message.lower() for keyword in ["exercise", "technique", "information", "guide", "help", "how to"]): |
|
retrieved_docs = app.search_documents(message) |
|
context = "\n".join(retrieved_docs) |
|
if context.strip(): |
|
messages.append({"role": "system", "content": "Relevant documents: " + context}) |
|
|
|
response = client.chat_completion(messages, max_tokens=1024, temperature=0.7, top_p=0.9) |
|
response_content = "".join([choice.message['content'] for choice in response.choices if 'content' in choice.message]) |
|
|
|
polished_response = preprocess_response(response_content) |
|
shortened_response = shorten_response(polished_response) |
|
|
|
history.append((message, shortened_response)) |
|
|
|
|
|
audio_path = text_to_speech(shortened_response) |
|
avatar_video_path = create_speaking_avatar("avatar.png", audio_path) |
|
|
|
return history, "", audio_path, avatar_video_path |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# 🧘♀️ **Dialectical Behaviour Therapy**") |
|
gr.Markdown( |
|
"‼️Disclaimer: This chatbot is based on a DBT exercise book that is publicly available. " |
|
"We are not medical practitioners, and the use of this chatbot is at your own responsibility." |
|
) |
|
|
|
chatbot = gr.Chatbot() |
|
|
|
with gr.Row(): |
|
txt_input = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message here...", |
|
lines=1 |
|
) |
|
submit_btn = gr.Button("Submit", scale=1) |
|
refresh_btn = gr.Button("Refresh Chat", scale=1, variant="secondary") |
|
|
|
example_questions = [ |
|
["What are some techniques to handle distressing situations?"], |
|
["How does DBT help with emotional regulation?"], |
|
["Can you give me an example of an interpersonal effectiveness skill?"], |
|
["I want to practice mindfulness. Can you help me?"], |
|
] |
|
|
|
gr.Examples(examples=example_questions, inputs=[txt_input]) |
|
|
|
submit_btn.click(fn=respond, inputs=[txt_input, chatbot], outputs=[chatbot, txt_input, gr.Audio(), gr.Video()]) |
|
refresh_btn.click(lambda: [], None, chatbot) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|