import streamlit as st import openai from openai import OpenAI import os import base64 import cv2 from moviepy.editor import VideoFileClip import pytz from datetime import datetime import glob from audio_recorder_streamlit import audio_recorder # Set API key and organization ID from environment variables openai.api_key = os.getenv('OPENAI_API_KEY') openai.organization = os.getenv('OPENAI_ORG_ID') client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) # Define the model to be used MODEL = "gpt-4o-2024-05-13" def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") replaced_prompt = prompt.replace(" ", "_").replace("\n", "_") safe_prompt = "".join(x for x in replaced_prompt if x.isalnum() or x == "_")[:90] return f"{safe_date_time}_{safe_prompt}.{file_type}" def create_file(filename, prompt, response, should_save=True): if not should_save: return base_filename, ext = os.path.splitext(filename) if ext in ['.txt', '.htm', '.md']: with open(f"{base_filename}.md", 'w', encoding='utf-8') as file: file.write(response) def process_text(text_input): if text_input: st.session_state.messages.append({"role": "user", "content": text_input}) with st.chat_message("user"): st.markdown(text_input) with st.chat_message("assistant"): completion = client.chat.completions.create( model=MODEL, messages=[ {"role": m["role"], "content": m["content"]} for m in st.session_state.messages ], stream=False ) return_text = completion.choices[0].message.content st.write("Assistant: " + return_text) filename = generate_filename(text_input, "md") create_file(filename, text_input, return_text, should_save=True) st.session_state.messages.append({"role": "assistant", "content": return_text}) def process_text2(MODEL='gpt-4o-2024-05-13', text_input='What is 2+2 and what is an imaginary number'): if text_input: st.session_state.messages.append({"role": "user", "content": text_input}) completion = client.chat.completions.create( model=MODEL, messages=st.session_state.messages ) return_text = completion.choices[0].message.content st.write("Assistant: " + return_text) filename = generate_filename(text_input, "md") create_file(filename, text_input, return_text, should_save=True) return return_text def save_image(image_input, filename): # Save the uploaded image file with open(filename, "wb") as f: f.write(image_input.getvalue()) return filename def process_image(image_input): if image_input: st.markdown('Processing image: ' + image_input.name ) base64_image = base64.b64encode(image_input.read()).decode("utf-8") st.session_state.messages.append({"role": "user", "content": [ {"type": "text", "text": "Help me understand what is in this picture and list ten facts as markdown outline with appropriate emojis that describes what you see."}, {"type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}"} } ]}) response = client.chat.completions.create( model=MODEL, messages=st.session_state.messages, temperature=0.0, ) image_response = response.choices[0].message.content st.markdown(image_response) filename_md = generate_filename(image_input.name + '- ' + image_response, "md") filename_png = filename_md.replace('.md', '.' + image_input.name.split('.')[-1]) create_file(filename_md, image_response, '', True) with open(filename_md, "w", encoding="utf-8") as f: f.write(image_response) filename_img = image_input.name save_image(image_input, filename_img) st.session_state.messages.append({"role": "assistant", "content": image_response}) return image_response def process_audio(audio_input): if audio_input: st.session_state.messages.append({"role": "user", "content": audio_input}) transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_input, ) response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content":"""You are generating a transcript summary. Create a summary of the provided transcription. Respond in Markdown."""}, {"role": "user", "content": [{"type": "text", "text": f"The audio transcription is: {transcription.text}"}],} ], temperature=0, ) audio_response = response.choices[0].message.content st.markdown(audio_response) filename = generate_filename(transcription.text, "md") create_file(filename, transcription.text, audio_response, should_save=True) st.session_state.messages.append({"role": "assistant", "content": audio_response}) def process_audio_for_video(video_input): if video_input: st.session_state.messages.append({"role": "user", "content": video_input}) transcription = client.audio.transcriptions.create( model="whisper-1", file=video_input, ) response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content":"""You are generating a transcript summary. Create a summary of the provided transcription. Respond in Markdown."""}, {"role": "user", "content": [{"type": "text", "text": f"The audio transcription is: {transcription}"}],} ], temperature=0, ) video_response = response.choices[0].message.content st.markdown(video_response) filename = generate_filename(transcription, "md") create_file(filename, transcription, video_response, should_save=True) st.session_state.messages.append({"role": "assistant", "content": video_response}) return video_response def save_video(video_file): # Save the uploaded video file with open(video_file.name, "wb") as f: f.write(video_file.getbuffer()) return video_file.name def process_video(video_path, seconds_per_frame=2): base64Frames = [] base_video_path, _ = os.path.splitext(video_path) video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame = 0 # Loop through the video and extract frames at specified sampling rate while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() # Extract audio from video audio_path = f"{base_video_path}.mp3" clip = VideoFileClip(video_path) clip.audio.write_audiofile(audio_path, bitrate="32k") clip.audio.close() clip.close() print(f"Extracted {len(base64Frames)} frames") print(f"Extracted audio to {audio_path}") return base64Frames, audio_path def save_and_play_audio(audio_recorder): audio_bytes = audio_recorder(key='audio_recorder') if audio_bytes: filename = generate_filename("Recording", "wav") with open(filename, 'wb') as f: f.write(audio_bytes) st.audio(audio_bytes, format="audio/wav") return filename return None def process_audio_and_video(video_input): if video_input is not None: # Save the uploaded video file video_path = save_video(video_input) # Process the saved video base64Frames, audio_path = process_video(video_path, seconds_per_frame=1) # Get the transcript for the video model call transcript = process_audio_for_video(video_input) # Generate a summary with visual and audio st.session_state.messages.append({"role": "user", "content": [ "These are the frames from the video.", *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames), {"type": "text", "text": f"The audio transcription is: {transcript}"} ]}) response = client.chat.completions.create( model=MODEL, messages=st.session_state.messages, temperature=0, ) video_response = response.choices[0].message.content st.markdown(video_response) filename = generate_filename(transcript, "md") create_file(filename, transcript, video_response, should_save=True) st.session_state.messages.append({"role": "assistant", "content": video_response}) def main(): st.markdown("##### GPT-4o Omni Model: Text, Audio, Image, & Video") option = st.selectbox("Select an option", ("Text", "Image", "Audio", "Video")) if option == "Text": text_input = st.text_input("Enter your text:") if text_input: process_text(text_input) elif option == "Image": image_input = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) process_image(image_input) elif option == "Audio": audio_input = st.file_uploader("Upload an audio file", type=["mp3", "wav"]) process_audio(audio_input) elif option == "Video": video_input = st.file_uploader("Upload a video file", type=["mp4"]) process_audio_and_video(video_input) # File Gallery all_files = glob.glob("*.md") all_files = [file for file in all_files if len(os.path.splitext(file)[0]) >= 10] # exclude files with short names all_files.sort(key=lambda x: (os.path.splitext(x)[1], x), reverse=True) # sort by filename length which puts similar prompts together - consider making date and time of file optional. st.sidebar.title("File Gallery") for file in all_files: with st.sidebar.expander(file): with open(file, "r", encoding="utf-8") as f: file_content = f.read() st.code(file_content, language="markdown") # ChatBot Entry if prompt := st.chat_input("GPT-4o Multimodal ChatBot - What can I help you with?"): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): completion = client.chat.completions.create( model=MODEL, messages=st.session_state.messages, stream=True ) response = process_text2(text_input=prompt) st.session_state.messages.append({"role": "assistant", "content": response}) # Transcript to arxiv and client chat completion filename = save_and_play_audio(audio_recorder) if filename is not None: transcript = transcribe_canary(filename) # Search ArXiV and get the Summary and Reference Papers Listing result = search_arxiv(transcript) # Start chatbot with transcript: st.session_state.messages.append({"role": "user", "content": transcript}) with st.chat_message("user"): st.markdown(transcript) with st.chat_message("assistant"): completion = client.chat.completions.create( model=MODEL, messages=st.session_state.messages, stream=True ) response = process_text2(text_input=prompt) st.session_state.messages.append({"role": "assistant", "content": response}) if __name__ == "__main__": main()