File size: 5,898 Bytes
50137a9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import gradio as gr
from huggingface_hub import InferenceClient
from typing import List, Tuple
import fitz # PyMuPDF
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
from gtts import gTTS
import os
from PIL import Image
from moviepy.editor import ImageSequenceClip
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
class MyApp:
def __init__(self) -> None:
self.documents = []
self.embeddings = None
self.index = None
self.load_pdf("THEDIA1.pdf")
self.build_vector_db()
def load_pdf(self, file_path: str) -> None:
doc = fitz.open(file_path)
self.documents = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
self.documents.append({"page": page_num + 1, "content": text})
print("PDF processed successfully!")
def build_vector_db(self) -> None:
model = SentenceTransformer('all-MiniLM-L6-v2')
self.embeddings = model.encode([doc["content"] for doc in self.documents], show_progress_bar=True)
self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
self.index.add(np.array(self.embeddings))
print("Vector database built successfully!")
def search_documents(self, query: str, k: int = 3) -> List[str]:
model = SentenceTransformer('all-MiniLM-L6-v2')
query_embedding = model.encode([query], show_progress_bar=False)
D, I = self.index.search(np.array(query_embedding), k)
results = [self.documents[i]["content"] for i in I[0]]
return results if results else ["No relevant documents found."]
app = MyApp()
def preprocess_response(response: str) -> str:
response = response.strip()
response = response.replace("\n\n", "\n")
response = response.replace(" ,", ",")
response = response.replace(" .", ".")
response = " ".join(response.split())
if not any(word in response.lower() for word in ["sorry", "apologize", "empathy"]):
response = "I'm here to help. " + response
return response
def shorten_response(response: str) -> str:
messages = [{"role": "system", "content": "Shorten and refine this response in a supportive and empathetic manner."}, {"role": "user", "content": response}]
result = client.chat_completion(messages, max_tokens=512, temperature=0.5, top_p=0.9)
return result.choices[0].message['content'].strip()
def text_to_speech(text: str, lang: str = 'en'):
tts = gTTS(text=text, lang=lang, slow=False)
tts.save("response.mp3")
return "response.mp3"
def create_speaking_avatar(image_path: str, audio_path: str):
# Use a simple way to generate a video where the image "speaks" the text
image = Image.open(image_path)
frames = [image] * 30 # 1 second at 30fps
clip = ImageSequenceClip([np.array(f) for f in frames], fps=30)
clip = clip.set_audio(audio_path)
output_path = "output.mp4"
clip.write_videofile(output_path, codec="libx264")
return output_path
def respond(message: str, history: List[Tuple[str, str]]):
system_message = "You are a supportive and empathetic Dialectical Behaviour Therapist assistant. You politely guide users through DBT exercises based on the given DBT book. You must say one thing at a time and ask follow-up questions to continue the chat."
messages = [{"role": "system", "content": system_message}]
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
if any(keyword in message.lower() for keyword in ["exercise", "technique", "information", "guide", "help", "how to"]):
retrieved_docs = app.search_documents(message)
context = "\n".join(retrieved_docs)
if context.strip():
messages.append({"role": "system", "content": "Relevant documents: " + context})
response = client.chat_completion(messages, max_tokens=1024, temperature=0.7, top_p=0.9)
response_content = "".join([choice.message['content'] for choice in response.choices if 'content' in choice.message])
polished_response = preprocess_response(response_content)
shortened_response = shorten_response(polished_response)
history.append((message, shortened_response))
# Convert response text to speech and create the speaking avatar
audio_path = text_to_speech(shortened_response)
avatar_video_path = create_speaking_avatar("avatar.png", audio_path)
return history, "", audio_path, avatar_video_path
with gr.Blocks() as demo:
gr.Markdown("# 🧘♀️ **Dialectical Behaviour Therapy**")
gr.Markdown(
"‼️Disclaimer: This chatbot is based on a DBT exercise book that is publicly available. "
"We are not medical practitioners, and the use of this chatbot is at your own responsibility."
)
chatbot = gr.Chatbot()
with gr.Row():
txt_input = gr.Textbox(
show_label=False,
placeholder="Type your message here...",
lines=1
)
submit_btn = gr.Button("Submit", scale=1)
refresh_btn = gr.Button("Refresh Chat", scale=1, variant="secondary")
example_questions = [
["What are some techniques to handle distressing situations?"],
["How does DBT help with emotional regulation?"],
["Can you give me an example of an interpersonal effectiveness skill?"],
["I want to practice mindfulness. Can you help me?"],
]
gr.Examples(examples=example_questions, inputs=[txt_input])
submit_btn.click(fn=respond, inputs=[txt_input, chatbot], outputs=[chatbot, txt_input, gr.Audio(), gr.Video()])
refresh_btn.click(lambda: [], None, chatbot)
if __name__ == "__main__":
demo.launch()
|