File size: 5,898 Bytes
50137a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import gradio as gr
from huggingface_hub import InferenceClient
from typing import List, Tuple
import fitz  # PyMuPDF
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
from gtts import gTTS
import os
from PIL import Image
from moviepy.editor import ImageSequenceClip

client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")

class MyApp:
    def __init__(self) -> None:
        self.documents = []
        self.embeddings = None
        self.index = None
        self.load_pdf("THEDIA1.pdf")
        self.build_vector_db()

    def load_pdf(self, file_path: str) -> None:
        doc = fitz.open(file_path)
        self.documents = []
        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            self.documents.append({"page": page_num + 1, "content": text})
        print("PDF processed successfully!")

    def build_vector_db(self) -> None:
        model = SentenceTransformer('all-MiniLM-L6-v2')
        self.embeddings = model.encode([doc["content"] for doc in self.documents], show_progress_bar=True)
        self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
        self.index.add(np.array(self.embeddings))
        print("Vector database built successfully!")

    def search_documents(self, query: str, k: int = 3) -> List[str]:
        model = SentenceTransformer('all-MiniLM-L6-v2')
        query_embedding = model.encode([query], show_progress_bar=False)
        D, I = self.index.search(np.array(query_embedding), k)
        results = [self.documents[i]["content"] for i in I[0]]
        return results if results else ["No relevant documents found."]

app = MyApp()

def preprocess_response(response: str) -> str:
    response = response.strip()
    response = response.replace("\n\n", "\n")
    response = response.replace(" ,", ",")
    response = response.replace(" .", ".")
    response = " ".join(response.split())
    if not any(word in response.lower() for word in ["sorry", "apologize", "empathy"]):
        response = "I'm here to help. " + response
    return response

def shorten_response(response: str) -> str:
    messages = [{"role": "system", "content": "Shorten and refine this response in a supportive and empathetic manner."}, {"role": "user", "content": response}]
    result = client.chat_completion(messages, max_tokens=512, temperature=0.5, top_p=0.9)
    return result.choices[0].message['content'].strip()

def text_to_speech(text: str, lang: str = 'en'):
    tts = gTTS(text=text, lang=lang, slow=False)
    tts.save("response.mp3")
    return "response.mp3"

def create_speaking_avatar(image_path: str, audio_path: str):
    # Use a simple way to generate a video where the image "speaks" the text
    image = Image.open(image_path)
    frames = [image] * 30  # 1 second at 30fps
    clip = ImageSequenceClip([np.array(f) for f in frames], fps=30)
    clip = clip.set_audio(audio_path)
    output_path = "output.mp4"
    clip.write_videofile(output_path, codec="libx264")
    return output_path

def respond(message: str, history: List[Tuple[str, str]]):
    system_message = "You are a supportive and empathetic Dialectical Behaviour Therapist assistant. You politely guide users through DBT exercises based on the given DBT book. You must say one thing at a time and ask follow-up questions to continue the chat."
    messages = [{"role": "system", "content": system_message}]

    for val in history:
        if val[0]:
            messages.append({"role": "user", "content": val[0]})
        if val[1]:
            messages.append({"role": "assistant", "content": val[1]})

    messages.append({"role": "user", "content": message})

    if any(keyword in message.lower() for keyword in ["exercise", "technique", "information", "guide", "help", "how to"]):
        retrieved_docs = app.search_documents(message)
        context = "\n".join(retrieved_docs)
        if context.strip():
            messages.append({"role": "system", "content": "Relevant documents: " + context})

    response = client.chat_completion(messages, max_tokens=1024, temperature=0.7, top_p=0.9)
    response_content = "".join([choice.message['content'] for choice in response.choices if 'content' in choice.message])
    
    polished_response = preprocess_response(response_content)
    shortened_response = shorten_response(polished_response)

    history.append((message, shortened_response))

    # Convert response text to speech and create the speaking avatar
    audio_path = text_to_speech(shortened_response)
    avatar_video_path = create_speaking_avatar("avatar.png", audio_path)

    return history, "", audio_path, avatar_video_path

with gr.Blocks() as demo:
    gr.Markdown("# 🧘‍♀️ **Dialectical Behaviour Therapy**")
    gr.Markdown(
        "‼️Disclaimer: This chatbot is based on a DBT exercise book that is publicly available. "
        "We are not medical practitioners, and the use of this chatbot is at your own responsibility."
    )

    chatbot = gr.Chatbot()

    with gr.Row():
        txt_input = gr.Textbox(
            show_label=False,
            placeholder="Type your message here...",
            lines=1
        )
        submit_btn = gr.Button("Submit", scale=1)
        refresh_btn = gr.Button("Refresh Chat", scale=1, variant="secondary")

    example_questions = [
        ["What are some techniques to handle distressing situations?"],
        ["How does DBT help with emotional regulation?"],
        ["Can you give me an example of an interpersonal effectiveness skill?"],
        ["I want to practice mindfulness. Can you help me?"],
    ]

    gr.Examples(examples=example_questions, inputs=[txt_input])

    submit_btn.click(fn=respond, inputs=[txt_input, chatbot], outputs=[chatbot, txt_input, gr.Audio(), gr.Video()])
    refresh_btn.click(lambda: [], None, chatbot)

if __name__ == "__main__":
    demo.launch()