Spaces:
Sleeping
Sleeping
import gradio as gr | |
import random | |
import time | |
import os | |
import requests | |
import base64 | |
import random | |
import pymongo | |
import certifi | |
token = os.getenv("runpod_key") | |
# Chatbot demo with multimodal input (text, markdown, LaTeX, code blocks, image, audio, & video). Plus shows support for streaming text. | |
uri = "mongodb+srv://clementrof:t5fXqwpDQYFpvuCk@cluster0.rl5qhcj.mongodb.net/?retryWrites=true&w=majority" | |
# Create a new client and connect to the server | |
client = pymongo.MongoClient(uri, tlsCAFile=certifi.where()) | |
# Send a ping to confirm a successful connection | |
try: | |
client.admin.command('ping') | |
print("Pinged your deployment. You successfully connected to MongoDB!") | |
except Exception as e: | |
print(e) | |
# Access your database | |
db = client.get_database('camila') | |
records = db.info | |
last_used_id = 0 | |
def generate_unique_id(): | |
global last_used_id | |
last_used_id += random.randint(100, 1000000) | |
return last_used_id | |
def clear_button_callback(): | |
global last_used_id | |
# Generate a new unique ID when the clear button is clicked | |
ID = generate_unique_id() | |
# Update the ID attribute in the chatbot object | |
chatbot.ID = ID | |
def save(ID, response, message_text): | |
# Check if the user already has a collection | |
records.find_one({'ID': ID}) | |
records.update_one({'ID': ID}, | |
{'$push':{'message': {'role': 'user', 'content': f'{message_text}'}}}) | |
records.update_one({'ID': ID}, | |
{'$push':{'message': {'role': 'assistant', 'content': f'{response}'}}}) | |
return response | |
######################################### | |
######################################### | |
def LLM_call(message_log): | |
serverless_api_id = 'bi48s9yd8n0spz' | |
# Define the URL you want to send the request to | |
url = f"https://api.runpod.ai/v2/{serverless_api_id}/run" | |
# Define your custom headers | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
# Define your data (this could also be a JSON payload) | |
data = { | |
"input": { | |
"prompt": message_log, | |
"max_new_tokens": 4500, | |
"temperature": 0.7, | |
"top_k": 50, | |
"top_p": 0.9, | |
"repetition_penalty": 1.2, | |
"batch_size": 8, | |
"stop": ["</s>"] | |
} | |
} | |
# Send the POST request with headers and data | |
call = requests.post(url, headers=headers, json=data) | |
response_data = call.json() | |
msg_id = response_data['id'] | |
print("Message ID:", msg_id) | |
output = "Output not available" | |
# Poll the API until the response is ready | |
while True: | |
# Get the status using the message ID | |
response = requests.get(f"https://api.runpod.ai/v2/{serverless_api_id}/status/{msg_id}", headers=headers) | |
if response.status_code == 200: | |
response_data = response.json() | |
status = response_data.get('status') | |
if status == 'COMPLETED': | |
# Access the 'output' directly from the response | |
output = response_data.get('output', 'Output not available') | |
print("Response content:", output) | |
break # Exit the loop once the response is ready | |
elif status == 'FAILED': | |
error_message = response_data.get('error', 'Unknown error') | |
print("Request failed. Reason:", error_message) | |
break # Exit the loop if the request failed | |
else: | |
print("Failed to get status. HTTP status code:", response.status_code) | |
# Wait for a short time before polling again (e.g., 2 seconds) | |
time.sleep(2) | |
return output | |
def Chat_call(chat,prompt): | |
global last_used_id | |
# Use the last used ID | |
ID = last_used_id | |
existing_user_doc = records.find_one({'ID': ID}) | |
if existing_user_doc: | |
message_log = [{"role": "system", "content": f"{prompt}"}, | |
] | |
messages = existing_user_doc['message'] | |
if len(messages)>5: | |
messages = messages[-5:] | |
message_log.extend(messages) | |
new_message = {"role": "user", "content": chat} | |
message_log.append(new_message) | |
response = LLM_call(message_log) | |
else: | |
new_user_doc = { | |
'ID': ID, | |
'message': [] | |
} | |
records.insert_one(new_user_doc) | |
response = "Hello" | |
response = save(ID, response, chat) | |
return response | |
with gr.Blocks() as demo: | |
chatbot = gr.Chatbot() | |
chatbot.ID = 10 # Initialize the ID attribute | |
msg = gr.Textbox(label = "Chat") | |
prompt = gr.Textbox(label ="Prompt") | |
clear = gr.ClearButton([msg, chatbot,prompt]) | |
def respond(message, chat_history,prompt): | |
bot_message = Chat_call(message,prompt) | |
chat_history.append((message, bot_message)) | |
time.sleep(2) | |
return "", chat_history,prompt | |
msg.submit(respond, [msg, chatbot, prompt], [msg, chatbot, prompt]) | |
# Add an event listener to the Chatbot to update the ID when the button is clicked | |
clear.click(lambda: clear_button_callback()) | |
if __name__ == "__main__": | |
demo.launch() | |