import streamlit as st from streamlit_webrtc import VideoProcessorBase, webrtc_streamer import uuid import requests import json from typing import Dict, Optional import time class VideoProcessor(VideoProcessorBase): def recv(self, frame): img = frame.to_ndarray(format="bgr24") return img class WebRTCConnection: def __init__(self): self.server_url = "http://localhost:7860" self.peer_id: Optional[str] = None def register_peer(self, peer_id: str) -> Dict: try: response = requests.post( f"{self.server_url}/register_peer", json={"peer_id": peer_id}, timeout=5 # Add timeout ) return response.json() except requests.exceptions.RequestException as e: st.error(f"Connection error: {str(e)}") return {"status": "error", "message": str(e)} def send_offer(self, peer_id: str, offer: Dict) -> Dict: try: response = requests.post( f"{self.server_url}/send_offer", json={"peer_id": peer_id, "offer": offer}, timeout=5 ) return response.json() except requests.exceptions.RequestException as e: st.error(f"Connection error: {str(e)}") return {"status": "error", "message": str(e)} def get_answer(self, peer_id: str) -> Dict: try: response = requests.get( f"{self.server_url}/get_answer", params={"peer_id": peer_id}, timeout=5 ) return response.json() except requests.exceptions.RequestException as e: st.error(f"Connection error: {str(e)}") return {"status": "error", "message": str(e)} def create_video_interface(): st.set_page_config( page_title="Video Call", page_icon="🎥", layout="wide", initial_sidebar_state="collapsed" ) # Initialize session state if 'connection' not in st.session_state: st.session_state.connection = WebRTCConnection() if 'status' not in st.session_state: st.session_state.status = "Initializing..." if 'peer_id' not in st.session_state: st.session_state.peer_id = None st.title("📱 Android Video Call") # Create two columns for better mobile layout col1, col2 = st.columns(2) with col1: peer_id_option = st.radio( "Peer ID Option", ("Generate", "Enter"), key="peer_id_option" ) if peer_id_option == "Generate": st.session_state.peer_id = str(uuid.uuid4())[:8] st.code(st.session_state.peer_id, language=None) else: st.session_state.peer_id = st.text_input( "Enter Peer ID", key="peer_id_input", max_chars=8, placeholder="Enter 8-character ID" ) with col2: st.metric( label="Status", value=st.session_state.status ) if st.button("Connect", key="connect_button", use_container_width=True): if st.session_state.peer_id: response = st.session_state.connection.register_peer( st.session_state.peer_id ) if response.get("status") == "registered": st.session_state.status = "Connected" st.rerun() # Using st.rerun() instead of experimental_rerun else: st.error("Please enter or generate a Peer ID") # Video container with responsive layout st.markdown(""" """, unsafe_allow_html=True) if st.session_state.status == "Connected": with st.container(): st.markdown("
", unsafe_allow_html=True) webrtc_streamer( key="local", video_processor_factory=VideoProcessor, rtc_configuration={ "iceServers": [ {"urls": ["stun:stun.l.google.com:19302"]}, { "urls": ["turn:relay.metered.ca:80"], "username": "openrelayproject", "credential": "openrelayproject" } ] }, media_stream_constraints={ "video": { "width": {"ideal": 640}, "height": {"ideal": 480}, "frameRate": {"max": 30} }, "audio": True } ) st.markdown("
", unsafe_allow_html=True) if st.button("End Call", key="end_call", use_container_width=True): st.session_state.status = "Call ended" st.rerun() # Using st.rerun() instead of experimental_rerun if __name__ == "__main__": create_video_interface()