BeeWeb / app.py
swyx's picture
speaker joinign and sorting
d0bef40
raw
history blame
15 kB
import gradio as gr
import asyncio
from typing import List, Dict, Any, Tuple, Generator
from beeai import Bee
from huggingface_hub import InferenceClient
import logging
from datetime import datetime
import pytz
import pandas as pd
from functools import partial
# Set up logging with a higher level
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='app.log',
filemode='w')
# Global variable to track the current page
current_page = 1
total_pages = 1
async def fetch_conversations(api_key: str, page: int = 1) -> Dict[str, Any]:
bee = Bee(api_key)
logging.info(f"Fetching conversations for user 'me', page {page}")
conversations = await bee.get_conversations("me", page=page, limit=15)
return conversations
def format_end_time(end_time: str) -> str:
utc_time = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
user_timezone = pytz.timezone('US/Pacific') # TODO: Replace with actual user timezone
local_time = utc_time.astimezone(user_timezone)
timezone_abbr = local_time.strftime('%Z')
return f"{local_time.strftime('%I:%M %p')} {timezone_abbr}"
async def fetch_conversation(api_key: str, conversation_id: int) -> Dict[str, Any]:
bee = Bee(api_key)
try:
logging.info(f"Fetching conversation with ID: {conversation_id}")
full_conversation = await bee.get_conversation("me", conversation_id)
logging.debug(f"Raw conversation data: {full_conversation}")
return full_conversation
except Exception as e:
logging.error(f"Error fetching conversation {conversation_id}: {str(e)}")
return {"error": f"Failed to fetch conversation: {str(e)}"}
def format_conversation(data: Dict[str, Any]) -> str:
try:
conversation = data.get("conversation", {})
logging.debug(f"Conversation keys: {conversation.keys()}")
formatted = f"# Conversation [{conversation['id']}] "
# Format start_time and end_time
start_time = conversation.get('start_time')
end_time = conversation.get('end_time')
if start_time and end_time:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
pacific_tz = pytz.timezone('US/Pacific')
start_pacific = start_dt.astimezone(pacific_tz)
end_pacific = end_dt.astimezone(pacific_tz)
if start_pacific.date() == end_pacific.date():
formatted += f"{start_pacific.strftime('%I:%M %p')} - {end_pacific.strftime('%I:%M %p')} PT\n\n"
else:
formatted += f"\n\n**Start**: {start_pacific.strftime('%Y-%m-%d %I:%M %p')} PT\n"
formatted += f"**End**: {end_pacific.strftime('%Y-%m-%d %I:%M %p')} PT\n"
elif start_time:
start_time_formatted = format_end_time(start_time)
formatted += f"**Start**: {start_time_formatted}\n"
elif end_time:
end_time_formatted = format_end_time(end_time)
formatted += f"**End**: {end_time_formatted}\n"
# Display short_summary nicely
if 'short_summary' in conversation:
formatted += f"\n## Short Summary\n\n{conversation['short_summary']}\n"
formatted += "\n" # Add a newline for better readability
formatted += f"\n{conversation['summary']}"
# for key in ['summary']: #, 'short_summary', 'state', 'created_at', 'updated_at']:
# if key in conversation:
# formatted += f"**{key}**: {conversation[key]}\n"
if 'transcriptions' in conversation and conversation['transcriptions']:
formatted += "\n\n## Transcriptions\n\n"
last_timestamp = None
last_speaker = None
# Sort utterances chronologically
sorted_utterances = sorted(
conversation['transcriptions'][0].get('utterances', []),
key=lambda x: x.get('spoken_at', '')
)
for utterance in sorted_utterances:
current_timestamp = utterance.get('spoken_at')
speaker = int(utterance.get('speaker')) if utterance.get('speaker', '').isdigit() else str(utterance.get('speaker', ''))
text = utterance.get('text')
if last_timestamp is not None:
time_diff = datetime.fromisoformat(current_timestamp.replace('Z', '+00:00')) - datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
if time_diff.total_seconds() > 300: # More than 5 minutes
local_time = datetime.fromisoformat(current_timestamp.replace('Z', '+00:00')).astimezone().strftime('%I:%M %p')
formatted += f"\n\n[{local_time}]\n"
# Convert speaker to string to ensure consistent comparison
if speaker != last_speaker:
formatted += f"\n\nSpeaker **[{speaker}](https://kagi.com/search?q={current_timestamp})**: {text}"
else:
formatted += f" {text}"
last_timestamp = current_timestamp
last_speaker = speaker
return formatted
except Exception as e:
logging.error(f"Error formatting conversation: {str(e)}")
return f"Error formatting conversation: {str(e)}\n\nRaw data: {conversation}"
def format_duration(start_time: str, end_time: str) -> str:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
duration = end_dt - start_dt
return f"{duration.total_seconds() // 3600:.0f}h {((duration.total_seconds() % 3600) // 60):.0f}m"
async def list_conversations(api_key: str) -> Tuple[pd.DataFrame, str, int, int]:
global current_page, total_pages
conversations_data = await fetch_conversations(api_key, current_page)
conversations = conversations_data.get("conversations", [])
total_pages = conversations_data.get("totalPages", 1)
df = pd.DataFrame([
{
"ID": c['id'],
"Duration": format_duration(c['start_time'], c['end_time']) if c['start_time'] and c['end_time'] else "",
"Summary": ' '.join(c['short_summary'].split()[1:21]) + "..." if c['short_summary'] else "",
"End Time": format_end_time(c['end_time']) if c['end_time'] else "",
}
for c in conversations
])
df = df[["ID", "End Time", "Duration", "Summary"]] # Reorder columns to ensure ID is first
info = f"Page {current_page} of {total_pages}"
return df, info, current_page, total_pages
async def display_conversation(api_key: str, conversation_id: int) -> str:
full_conversation = await fetch_conversation(api_key, conversation_id)
if "error" in full_conversation:
logging.error(f"Error in full_conversation: {full_conversation['error']}")
return full_conversation["error"]
formatted_conversation = format_conversation(full_conversation)
return formatted_conversation
async def delete_conversation(api_key: str, conversation_id: int) -> str:
bee = Bee(api_key)
try:
await bee.delete_conversation("me", conversation_id)
return f"Conversation {conversation_id} deleted successfully."
except Exception as e:
logging.error(f"Error deleting conversation {conversation_id}: {str(e)}")
return f"Failed to delete conversation: {str(e)}"
# client = InferenceClient("Qwen/Qwen2.5-14B-Instruct") # needs more memory than available in free tier - The model Qwen/Qwen2.5-14B-Instruct is too large to be loaded automatically (29GB > 10GB)
client = InferenceClient("microsoft/Phi-3-mini-128k-instruct")
def respond(
message: str,
history: List[Tuple[str, str]],
system_message: str,
max_tokens: int,
temperature: float,
top_p: float,
conversation_context: str
) -> Generator[str, None, None]:
messages = [
{"role": "system", "content": system_message},
{"role": "system", "content": f"Here's the context of the conversation: {conversation_context}"}
]
for human, assistant in history:
messages.append({"role": "user", "content": human})
messages.append({"role": "assistant", "content": assistant})
messages.append({"role": "user", "content": message})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# Add this new function
def get_selected_conversation_id(table_data):
if table_data and len(table_data) > 0:
# Assuming the ID is in the first column
return table_data[0][0]
return None
async def delete_selected_conversation(api_key: str, conversation_id: int):
if not api_key or not conversation_id:
return "No conversation selected or API key missing", None, None, gr.update(visible=False), ""
logging.info(f"Deleting conversation with ID: {conversation_id}")
try:
result = await delete_conversation(api_key, conversation_id)
df, info, current_page, total_pages = await list_conversations(api_key)
return result, df, info, gr.update(visible=False), ""
except Exception as e:
error_message = f"Error deleting conversation: {str(e)}"
logging.error(error_message)
return error_message, None, None, gr.update(visible=False), ""
with gr.Blocks() as demo:
gr.Markdown("# Bee AI Conversation Viewer and Chat. See [source](https://github.com/swyxio/BeeWeb/) and [Space](https://huggingface.co/spaces/swyx/BeeWeb)")
with gr.Row():
with gr.Column(scale=1):
api_key = gr.Textbox(label="Enter your Bee API Key", type="password")
load_button = gr.Button("Load Conversations")
conversation_table = gr.Dataframe(
label="Select a conversation (CLICK ON THE ID!!!)",
interactive=True,
row_count=10 # Adjust this number to approximate the desired height
)
info_text = gr.Textbox(label="Info", interactive=False)
prev_page = gr.Button("Previous Page")
next_page = gr.Button("Next Page")
with gr.Column(scale=2):
conversation_details = gr.Markdown(
label="Conversation Details",
value="Enter your Bee API Key, click 'Load Conversations', then select a conversation to view details here."
)
delete_button = gr.Button("Delete Conversation", visible=False)
selected_conversation_id = gr.State(None)
conversation_context = gr.State("")
async def load_conversations(api_key):
try:
df, info, current_page, total_pages = await list_conversations(api_key)
prev_disabled = current_page == 1
next_disabled = current_page == total_pages
return df, info, gr.update(visible=True), gr.update(interactive=not prev_disabled), gr.update(interactive=not next_disabled)
except Exception as e:
error_message = f"Error loading conversations: {str(e)}"
logging.error(error_message)
return None, error_message, gr.update(visible=False), gr.update(interactive=False), gr.update(interactive=False)
load_button.click(load_conversations, inputs=[api_key], outputs=[conversation_table, info_text, delete_button, prev_page, next_page])
async def update_conversation(api_key, evt: gr.SelectData):
try:
logging.info(f"SelectData event: index={evt.index}, value={evt.value}")
conversation_id = int(evt.value)
logging.info(f"Updating conversation with ID: {conversation_id}")
# Return a loading message immediately
yield gr.update(value="Loading conversation details...", visible=True), gr.update(visible=False), None, None
# Fetch and format the conversation
formatted_conversation = await display_conversation(api_key, conversation_id)
# Return the formatted conversation and update the UI
yield formatted_conversation, gr.update(visible=True), conversation_id, formatted_conversation
except Exception as e:
error_message = f"Error updating conversation: {str(e)}"
logging.error(error_message)
yield error_message, gr.update(visible=False), None, None
conversation_table.select(
update_conversation,
inputs=[api_key],
outputs=[conversation_details, delete_button, selected_conversation_id, conversation_context],
)
# .then(
# lambda: None, # This is a no-op function
# None, # No inputs
# None, # No outputs
# _js="""
# () => {
# // Scroll to the conversation details
# document.querySelector('#conversation_details').scrollIntoView({behavior: 'smooth'});
# }
# """
# )
delete_button.click(
delete_selected_conversation,
inputs=[api_key, selected_conversation_id],
outputs=[conversation_details, conversation_table, info_text, delete_button, conversation_details]
)
async def change_page(api_key: str, direction: int) -> Tuple[pd.DataFrame, str, gr.update, gr.update]:
global current_page, total_pages
current_page += direction
current_page = max(1, min(current_page, total_pages)) # Ensure page is within bounds
df, info, current_page, total_pages = await list_conversations(api_key)
prev_disabled = current_page == 1
next_disabled = current_page == total_pages
return df, info, gr.update(interactive=not prev_disabled), gr.update(interactive=not next_disabled)
prev_page.click(partial(change_page, direction=-1), inputs=[api_key], outputs=[conversation_table, info_text, prev_page, next_page])
next_page.click(partial(change_page, direction=1), inputs=[api_key], outputs=[conversation_table, info_text, prev_page, next_page])
gr.Markdown("## Chat about the conversation")
chat_interface = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot. Analyze and discuss the given conversation context.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=2048, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
conversation_context
],
)
if __name__ == "__main__":
demo.launch()