import gradio as gr import random import openai from openai import APIError, APIConnectionError, RateLimitError import os from PIL import Image # This is the corrected import import io import base64 import asyncio from queue import Queue from threading import Thread import time # Get the current script's directory current_dir = os.path.dirname(os.path.abspath(__file__)) avatars_dir = os.path.join(current_dir, "avatars") # Dictionary mapping characters to their avatar image filenames character_avatars = { "Harry Potter": "harry.png", "Hermione Granger": "hermione.png", "poor Ph.D. student": "phd.png", "Donald Trump": "trump.png", "a super cute red panda": "red_panda.png" } predefined_characters = ["Harry Potter", "Hermione Granger", "poor Ph.D. student", "Donald Trump", "a super cute red panda"] def get_character(dropdown_value, custom_value): return custom_value if dropdown_value == "Custom" else dropdown_value def resize_image(image_path, size=(100, 100)): if not os.path.exists(image_path): return None with Image.open(image_path) as img: img.thumbnail(size) buffered = io.BytesIO() img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() resized_avatars = {} for character, filename in character_avatars.items(): full_path = os.path.join(avatars_dir, filename) if os.path.exists(full_path): resized_avatars[character] = resize_image(full_path) else: pass async def generate_response_stream(messages, api_key): client = openai.AsyncOpenAI( api_key=api_key, base_url="https://api.sambanova.ai/v1", ) try: if len(messages) >= 10: # avoid hitting rate limit time.sleep(0.5) response = await client.chat.completions.create( model='Meta-Llama-3.1-405B-Instruct', messages=messages, temperature=0.7, top_p=0.9, stream=True ) full_response = "" async for chunk in response: if chunk.choices[0].delta.content is not None: full_response += chunk.choices[0].delta.content yield full_response except Exception as e: yield f"Error: {str(e)}" async def simulate_conversation_stream(character1, character2, initial_message, num_turns, api_key): messages_character_1 = [{"role": "system", "content": f"Avoid overly verbose answer in your response. Act as {character1}."}, {"role": "assistant", "content": initial_message}] messages_character_2 = [{"role": "system", "content": f"Avoid overly verbose answer in your response. Act as {character2}."}, {"role": "user", "content": initial_message}] conversation = [ {"character": character1, "content": initial_message}, {"character": character2, "content": ""} # Initialize with an empty response for character2 ] yield format_conversation_as_html(conversation) num_turns *= 2 for turn_num in range(num_turns - 1): current_character = character2 if turn_num % 2 == 0 else character1 messages = messages_character_2 if turn_num % 2 == 0 else messages_character_1 full_response = "" async for response in generate_response_stream(messages, api_key): full_response = response conversation[-1]["content"] = full_response yield format_conversation_as_html(conversation) if turn_num % 2 == 0: messages_character_1.append({"role": "user", "content": full_response}) messages_character_2.append({"role": "assistant", "content": full_response}) else: messages_character_2.append({"role": "user", "content": full_response}) messages_character_1.append({"role": "assistant", "content": full_response}) # Add a new empty message for the next turn, if it's not the last turn if turn_num < num_turns - 2: next_character = character1 if turn_num % 2 == 0 else character2 conversation.append({"character": next_character, "content": ""}) def stream_conversation(character1, character2, initial_message, num_turns, api_key, queue): async def run_simulation(): async for html in simulate_conversation_stream(character1, character2, initial_message, num_turns, api_key): queue.put(html) queue.put(None) # Signal that the conversation is complete asyncio.run(run_simulation()) def chat_interface(character1_dropdown, character1_custom, character2_dropdown, character2_custom, initial_message, num_turns, api_key): character1 = get_character(character1_dropdown, character1_custom) character2 = get_character(character2_dropdown, character2_custom) queue = Queue() thread = Thread(target=stream_conversation, args=(character1, character2, initial_message, num_turns, api_key, queue)) thread.start() while True: result = queue.get() if result is None: break yield result thread.join() def format_conversation_as_html(conversation): html_output = """